[ https://issues.apache.org/jira/browse/SPARK-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15273241#comment-15273241 ]
Sital Kedia commented on SPARK-15074: ------------------------------------- Okay, I made a change to cache the index file and that made the shuffle read time twice as fast. I am going to put out a PR for that change soon. Now I see the shuffle service is spending most of the time in the FileChannelImpl.transferTo method (Refer to the stack trace below). I wonder if there is a way to speed it up further? <code> java.lang.Thread.State: RUNNABLE at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(FileChannelImpl.java:427) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:492) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:607) at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96) at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89) at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:254) at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:237) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:281) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:761) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:311) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:729) at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1127) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:663) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:644) at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:663) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:644) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:663) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:693) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:681) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:716) at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:954) at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:244) at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:184) at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:129) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:100) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) </code> > Spark shuffle service bottlenecked while fetching large amount of > intermediate data > ----------------------------------------------------------------------------------- > > Key: SPARK-15074 > URL: https://issues.apache.org/jira/browse/SPARK-15074 > Project: Spark > Issue Type: Improvement > Components: Shuffle > Affects Versions: 1.6.1 > Reporter: Sital Kedia > > While running a job which produces more than 90TB of intermediate data, we > find that about 10-15% of the reducer execution time is being spent in > shuffle fetch. > Jstack of the shuffle service reveals that most of the time the shuffle > service is reading the index files generated by the mapper. > {code} > java.lang.Thread.State: RUNNABLE > at java.io.FileInputStream.readBytes(Native Method) > at java.io.FileInputStream.read(FileInputStream.java:255) > at java.io.DataInputStream.readFully(DataInputStream.java:195) > at java.io.DataInputStream.readLong(DataInputStream.java:416) > at > org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:277) > at > org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:190) > at > org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:85) > at > org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:72) > at > org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:745) > {code} > The issue is that for each shuffle fetch, we reopen the same index file again > and read it. It would be much efficient, if we can avoid opening the same > file multiple times and cache the data. We can use an LRU cache to save the > index file information. This way we can also limit the number of entries in > the cache so that we don't blow up the memory indefinitely. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org