[ 
https://issues.apache.org/jira/browse/SPARK-44772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794816#comment-17794816
 ] 

Dongjoon Hyun commented on SPARK-44772:
---------------------------------------

Could you try this with Apache Spark 3.5.0?

> Reading blocks from remote executors  causes timeout issue
> ----------------------------------------------------------
>
>                 Key: SPARK-44772
>                 URL: https://issues.apache.org/jira/browse/SPARK-44772
>             Project: Spark
>          Issue Type: Bug
>          Components: EC2, PySpark, Shuffle, Spark Core
>    Affects Versions: 3.1.2
>            Reporter: nebi mert aydin
>            Priority: Blocker
>
> I'm using EMR 6.5 with Spark 3.1.2
> I'm shuffling 1.5 TiB of data with 3000 executors with 4 cores 23 gig memory 
> for executors
> Also speculative mode is on.
> {code:java}
> // df.repartition(6000) {code}
> I see lots of failures with 
> {code:java}
> 2023-08-11 01:01:09,846 ERROR 
> org.apache.spark.network.server.ChunkFetchRequestHandler 
> (shuffle-server-4-95): Error sending result 
> ChunkFetchSuccess[streamChunkId=StreamChunkId[streamId=779084003612,chunkIndex=323],buffer=FileSegmentManagedBuffer[file=/mnt3/yarn/usercache/zeppelin/appcache/application_1691438567823_0012/blockmgr-0d82ca05-9429-4ff2-9f61-e779e8e60648/07/shuffle_5_114492_0.data,offset=1836997,length=618]]
>  to /172.31.20.110:36654; closing connection
> java.nio.channels.ClosedChannelException
>       at 
> org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>       at 
> org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>       at 
> org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>       at 
> org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
>       at 
> org.sparkproject.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294)
>       at 
> org.apache.spark.network.server.ChunkFetchRequestHandler.respond(ChunkFetchRequestHandler.java:142)
>       at 
> org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:116)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>       at 
> org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> org.sparkproject.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> org.sparkproject.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>       at 
> org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>       at 
> org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>       at 
> org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>       at 
> org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>       at 
> org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>       at 
> org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>       at 
> org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>       at 
> org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: java.io.IOException: Broken pipe
>       at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>       at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>       at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>       at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>       at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
>       at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
>       at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:111)
>       at 
> org.sparkproject.io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
>       at 
> org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
>       at 
> org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
>       at 
> org.sparkproject.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
>       at 
> org.sparkproject.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
>       at 
> org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
>       at 
> org.sparkproject.io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>       at 
> org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>       ... 39 more {code}
>  
> I tried to set this for kernel
> ```
> sudo ethtool -K eth0 tso off
> sudo ethtool -K eth0 sg off
> ```
> Didn't work. I guess external shuffle service is not able to send to data to 
> other executors due to some reason.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to