Looks like there's slowness in sending shuffle files, maybe one executor
get overwhelmed with all the other executors trying to pull data?
Try lifting `spark.network.timeout` further, we ourselves had to push it to
600s from the default 120s

On Thu, Sep 28, 2017 at 10:19 AM, Ilya Karpov <i.kar...@cleverdata.ru>
wrote:

> Hi,
> I see strange behaviour in my job, and can’t understand what is wrong:
> the stage that uses shuffle data as an input job fails number of times
> because of org.apache.spark.shuffle.FetchFailedException seen in spark UI:
> FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1,
> mapId=50192, reduceId=12698, message=
> FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1,
> mapId=3, reduceId=12699, message=
>
> Digging in logs I found a scenario of task failure:
> 1. some shuffle-server-X-Y (important note: external shuffle service is
> OFF) report 'Broken pipe’ at 2017-09-26T05:40:26.484Z
> java.io.IOException: Broken pipe
>         at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
>         at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(
> FileChannelImpl.java:428)
>         at sun.nio.ch.FileChannelImpl.transferToDirectly(
> FileChannelImpl.java:493)
>         at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:608)
>         at io.netty.channel.DefaultFileRegion.transferTo(
> DefaultFileRegion.java:139)
>         at org.apache.spark.network.protocol.MessageWithHeader.
> transferTo(MessageWithHeader.java:121)
>         at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(
> NioSocketChannel.java:287)
>         at io.netty.channel.nio.AbstractNioByteChannel.doWrite(
> AbstractNioByteChannel.java:237)
>         at io.netty.channel.socket.nio.NioSocketChannel.doWrite(
> NioSocketChannel.java:314)
>         at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(
> AbstractChannel.java:802)
>         at io.netty.channel.nio.AbstractNioChannel$
> AbstractNioUnsafe.flush0(AbstractNioChannel.java:313)
>         at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(
> AbstractChannel.java:770)
>
> and "chunk send" errors:
> Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=65546478185,
> chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=
> /data/1/yarn/nm/usercache/hdfs/appcache/application_
> 1505206571245_2989/blockmgr-9be47304-ffe2-443a-bb10-
> 33a89928f5b9/04/shuffle_1_3_0.data, offset=40858881, length=3208}} to
> /someClientIP:somePort; closing connection
> with exceptions:
> java.nio.channels.ClosedChannelException
>         at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown
> Source)
>
> 2. then client of this shuffle-server complains with:
> Connection to some-hostname/someIP:port has been quiet for 240000 ms while
> there are outstanding requests. Assuming connection is dead; please adjust
> spark.network.timeout if this is wrong.
> and then
> Still have 3386 requests outstanding when connection from
> some-hostname/someIP:11431 is closed
> and then
> java.io.IOException: Connection from 
> shuffleServerHostname/shuffleServerIP:port
> closed
>         at org.apache.spark.network.client.TransportResponseHandler.
> channelInactive(TransportResponseHandler.java:146)
>         at org.apache.spark.network.server.TransportChannelHandler.
> channelInactive(TransportChannelHandler.java:108)
>         at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
>         at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
>         at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
>         at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
>         at io.netty.handler.timeout.IdleStateHandler.channelInactive(
> IdleStateHandler.java:278)
>         at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
>         at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
>         at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
>         at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
>         at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
>         at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
>         at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
>         at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
>
> this fails tasks and stage for several times. And finally job is failed.
>
> I also see retry messages: Failed to fetch block shuffle_1_3_12682, and
> will not retry (5 retries).
>
> Such a failures occur on different hosts.
>
> I can’t say that we were experiencing any network connectivity issues,
> node failures or smth similar. Seems that connection was dropped by some
> spark internal mechanisms.
>
> Any guesses what was the reason is appreciated!
>
> Spark 2.2.0
>
> Spark config:
> --num-executors 39
> \
> --conf spark.dynamicAllocation.enabled=false
> --conf spark.shuffle.service.enabled=false
> \
> --conf spark.driver.cores=1
> --conf spark.driver.memory=8g
> --conf spark.yarn.driver.memoryOverhead=2048
> --conf spark.driver.maxResultSize=2g
> \
> --conf spark.executor.cores=2
> --conf spark.executor.memory=6g
> --conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:-ResizePLAB
> -XX:ConcGCThreads=6 -XX:ParallelGCThreads=8 -XX:
> InitiatingHeapOccupancyPercent=30"
> --conf spark.yarn.executor.memoryOverhead=4096
> \
> --conf spark.memory.fraction=0.8
> --conf spark.memory.storageFraction=0.0
> \
> --conf spark.eventLog.compress=true
> --conf spark.eventLog.enabled=true
> --conf spark.eventLog.overwrite=true
> \
> --conf spark.logConf=true
> --conf spark.logLineage=true
> \
> --conf spark.yarn.historyServer.address=address1
> --conf spark.eventLog.dir=address2
> \
> --conf spark.reducer.maxReqsInFlight=10
> --conf spark.shuffle.io.maxRetries=5
> --conf spark.network.timeout=240
>
>
> Input shuffle size: 2.6 TB
> Partitions in stage: 20480 and 12768 were completed successfully.
>
> --
> Ilya Karpov
> Developer
>
> CleverDATA
> make your data clever
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to