[ 
https://issues.apache.org/jira/browse/SPARK-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-15074:
------------------------------------

    Assignee: Apache Spark

> Spark shuffle service bottlenecked while fetching large amount of 
> intermediate data
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-15074
>                 URL: https://issues.apache.org/jira/browse/SPARK-15074
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle
>    Affects Versions: 1.6.1
>            Reporter: Sital Kedia
>            Assignee: Apache Spark
>
> While running a job which produces more than 90TB of intermediate data, we 
> find that about 10-15% of the reducer execution time is being spent in 
> shuffle fetch. 
> Jstack of the shuffle service reveals that most of the time the shuffle 
> service is reading the index files generated by the mapper. 
> {code}
> java.lang.Thread.State: RUNNABLE
>       at java.io.FileInputStream.readBytes(Native Method)
>       at java.io.FileInputStream.read(FileInputStream.java:255)
>       at java.io.DataInputStream.readFully(DataInputStream.java:195)
>       at java.io.DataInputStream.readLong(DataInputStream.java:416)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:277)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:190)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:85)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:72)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>       at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> The issue is that for each shuffle fetch, we reopen the same index file again 
> and read it. It would be much efficient, if we can avoid opening the same 
> file multiple times and cache the data. We can use an LRU cache to save the 
> index file information. This way we can also limit the number of entries in 
> the cache so that we don't blow up the memory indefinitely. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to