[ 
https://issues.apache.org/jira/browse/SPARK-44756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-44756.
-----------------------------------------
    Fix Version/s: 4.0.0
       Resolution: Fixed

Issue resolved by pull request 42426
[https://github.com/apache/spark/pull/42426]

> Executor hangs when RetryingBlockTransferor fails to initiate retry
> -------------------------------------------------------------------
>
>                 Key: SPARK-44756
>                 URL: https://issues.apache.org/jira/browse/SPARK-44756
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, Spark Core
>    Affects Versions: 3.3.1
>            Reporter: Harunobu Daikoku
>            Assignee: Harunobu Daikoku
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 4.0.0
>
>
> We have been observing this issue several times in our production where some 
> executors are being stuck at BlockTransferService#fetchBlockSync().
> After some investigation, the issue seems to be caused by an unhandled edge 
> case in RetryingBlockTransferor.
> 1. Shuffle transfer fails for whatever reason
> {noformat}
> java.io.IOException: Cannot allocate memory
>       at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>       at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>       at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>       at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>       at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
>       at 
> org.apache.spark.network.shuffle.SimpleDownloadFile$SimpleDownloadWritableChannel.write(SimpleDownloadFile.java:78)
>       at 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher$DownloadCallback.onData(OneForOneBlockFetcher.java:340)
>       at 
> org.apache.spark.network.client.StreamInterceptor.handle(StreamInterceptor.java:79)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> {noformat}
> 2. The above exception caught by 
> [AbstractChannelHandlerContext#invokeChannelRead()|https://github.com/netty/netty/blob/netty-4.1.74.Final/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java#L381],
>  and propagated to the exception handler
> 3. Exception reaches 
> [RetryingBlockTransferor#initiateRetry()|https://github.com/apache/spark/blob/v3.3.1/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java#L178-L180],
>  and it tries to initiate retry
> {noformat}
> 23/08/09 16:58:37 shuffle-client-4-2 INFO RetryingBlockTransferor: Retrying 
> fetch (1/3) for 1 outstanding blocks after 5000 ms
> {noformat}
> 4. Retry initiation fails (in our case, it fails to create a new thread)
> 5. Exception caught by 
> [AbstractChannelHandlerContext#invokeExceptionCaught()|https://github.com/netty/netty/blob/netty-4.1.74.Final/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java#L305-L309],
>  and not further processed
> {noformat}
> 23/08/09 16:58:53 shuffle-client-4-2 DEBUG AbstractChannelHandlerContext: An 
> exception java.lang.OutOfMemoryError: unable to create new native thread
>       at java.lang.Thread.start0(Native Method)
>       at java.lang.Thread.start(Thread.java:719)
>       at 
> java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
>       at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
>       at 
> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockTransferor.initiateRetry(RetryingBlockTransferor.java:182)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockTransferor.access$500(RetryingBlockTransferor.java:43)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferFailure(RetryingBlockTransferor.java:230)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockFetchFailure(RetryingBlockTransferor.java:260)
>       at 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher.failRemainingBlocks(OneForOneBlockFetcher.java:318)
>       at 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher.access$300(OneForOneBlockFetcher.java:55)
>       at 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher$DownloadCallback.onFailure(OneForOneBlockFetcher.java:357)
>       at 
> org.apache.spark.network.client.StreamInterceptor.exceptionCaught(StreamInterceptor.java:56)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.exceptionCaught(TransportFrameDecoder.java:231)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> {noformat}
> 6. After all, retry never happens and the executor thread ends up being stuck 
> at 
> [BlockTransferService#fetchBlockSync()|https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala#L103],
>  waiting for the transfer to complete/fail
> {noformat}
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:293)
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1154)
> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1098)
> {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to