XiDuo You created SPARK-39291:
---------------------------------

             Summary: Fetch blocks and open stream should not respond a closed 
channel
                 Key: SPARK-39291
                 URL: https://issues.apache.org/jira/browse/SPARK-39291
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 3.4.0
            Reporter: XiDuo You


If user cancel and interrupt a reduce task who is fetching shuffle blocks, the 
channel would be closed. However there may be some ChunkFetchRequest still in 
flight, so the server side TransportRequestHandler would still try to respond 
those ChunkFetchRequest. It gets worser if the reduce stage is big.

 

 
{code:java}
22/05/24 21:29:30 ERROR ChunkFetchRequestHandler: Error sending result 
ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=736493140719,chunkIndex=6],errorString=java.lang.IllegalStateException:
 Requested chunk not available since streamId 736493140719 is closed
    at 
org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
    at 
org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:103)
    at 
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82)
    at 
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
    at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at 
io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
    at 
io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
    at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:750)
] to /ip:port; closing connection
java.nio.channels.ClosedChannelException
    at 
io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    at 
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    at 
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    at 
io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
    at 
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
    at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:750) {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to