[ 
https://issues.apache.org/jira/browse/SPARK-45134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17766969#comment-17766969
 ] 

Snoot.io commented on SPARK-45134:
----------------------------------

User 'gaoyajun02' has created a pull request for this issue:
https://github.com/apache/spark/pull/43004

> Data duplication may occur when fallback to origin shuffle block
> ----------------------------------------------------------------
>
>                 Key: SPARK-45134
>                 URL: https://issues.apache.org/jira/browse/SPARK-45134
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 3.2.0, 3.3.0, 3.4.0, 3.5.0
>            Reporter: gaoyajun02
>            Priority: Critical
>
> One possible situation that has been found is that, during the process of 
> requesting mergedBlockMeta, when the channel is closed, it may trigger two 
> callback callbacks and result in duplicate data for the original shuffle 
> blocks.
>  # The first time is when the channel is inactivated, the responseHandler 
> will execute the callback for all outstandingRpcs.
>  # The second time is when the listener corresponding to 
> shuffleClient.writeAndFlush executes the callback after the channel is closed.
> Some Error Logs:
> {code:java}
> 23/09/08 09:22:21 ERROR shuffle-client-7-1 TransportResponseHandler: Still 
> have 1 requests outstanding when connection from host/ip:prot is closed
> 23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to 
> get the meta of push-merged block for (3, 54) from host:port
> java.io.IOException: Connection from host:port closed
>         at 
> org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
>         at 
> org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>         at 
> io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>         at 
> io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>         at 
> io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>         at 
> org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>         at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>         at 
> io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>         at 
> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
>         at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>         at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>         at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>         at java.lang.Thread.run(Thread.java:745)
>  
> 23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to 
> get the meta of push-merged block for (3, 54) from host:port
> java.io.IOException: Failed to send RPC RPC 8079698359363123411 to 
> host/ip:port: java.nio.channels.ClosedChannelException
>         at 
> org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:433)
>         at 
> org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:409)
>         at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>         at 
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>         at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>         at 
> io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>         at 
> io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>         at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>         at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>         at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>         at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
>         at 
> io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>         at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>         at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>         at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
>         at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>         ... 18 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to