[ https://issues.apache.org/jira/browse/SPARK-45134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17766969#comment-17766969 ]
Snoot.io commented on SPARK-45134: ---------------------------------- User 'gaoyajun02' has created a pull request for this issue: https://github.com/apache/spark/pull/43004 > Data duplication may occur when fallback to origin shuffle block > ---------------------------------------------------------------- > > Key: SPARK-45134 > URL: https://issues.apache.org/jira/browse/SPARK-45134 > Project: Spark > Issue Type: Bug > Components: Shuffle > Affects Versions: 3.2.0, 3.3.0, 3.4.0, 3.5.0 > Reporter: gaoyajun02 > Priority: Critical > > One possible situation that has been found is that, during the process of > requesting mergedBlockMeta, when the channel is closed, it may trigger two > callback callbacks and result in duplicate data for the original shuffle > blocks. > # The first time is when the channel is inactivated, the responseHandler > will execute the callback for all outstandingRpcs. > # The second time is when the listener corresponding to > shuffleClient.writeAndFlush executes the callback after the channel is closed. > Some Error Logs: > {code:java} > 23/09/08 09:22:21 ERROR shuffle-client-7-1 TransportResponseHandler: Still > have 1 requests outstanding when connection from host/ip:prot is closed > 23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to > get the meta of push-merged block for (3, 54) from host:port > java.io.IOException: Connection from host:port closed > at > org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147) > at > org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) > at > io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) > at > io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) > at > io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) > at > org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) > at > io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) > at > io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) > at > io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > at > io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at > io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > at java.lang.Thread.run(Thread.java:745) > > 23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to > get the meta of push-merged block for (3, 54) from host:port > java.io.IOException: Failed to send RPC RPC 8079698359363123411 to > host/ip:port: java.nio.channels.ClosedChannelException > at > org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:433) > at > org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:409) > at > io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) > at > io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) > at > io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) > at > io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) > at > io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) > at > io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) > at > io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) > at > io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767) > at > io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) > at > io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > at > io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at > io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.nio.channels.ClosedChannelException > at > io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) > ... 18 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org