This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 919ece8ad [CELEBORN-2015][FOLLOWUP] Retry IOException failures for RPC 
requests
919ece8ad is described below

commit 919ece8ad28b29f7c715f23c721e812207109742
Author: Sanskar Modi <[email protected]>
AuthorDate: Mon Jun 9 11:53:48 2025 -0700

    [CELEBORN-2015][FOLLOWUP] Retry IOException failures for RPC requests
    
    ### What changes were proposed in this pull request?
    Follow up PR for https://github.com/apache/celeborn/pull/3286 – Handling 
IOException wrapped inside CelebornException.
    
    ### Why are the changes needed?
    
    `org.apache.celeborn.common.util.ThreadUtils#awaitResult` wraps non-timeout 
exception into CelebornException because of which it is not getting caught and 
retries are not working.
    
    Ex – 
    ```
    org.apache.celeborn.common.exception.CelebornRuntimeException: 
setupLifecycleManagerRef failed!
            at 
org.apache.celeborn.client.ShuffleClientImpl.setupLifecycleManagerRef(ShuffleClientImpl.java:1834)
            at 
org.apache.celeborn.client.ShuffleClient.get(ShuffleClient.java:89)
            at 
org.apache.spark.shuffle.celeborn.SparkShuffleManager.getWriter(SparkShuffleManager.java:241)
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
            at org.apache.spark.scheduler.Task.run(Task.scala:138)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:556)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:559)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.base/java.lang.Thread.run(Thread.java:840)
    Caused by: org.apache.celeborn.common.exception.CelebornException: 
Exception thrown in awaitResult:
            at 
org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:320)
            at 
org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:78)
            at 
org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRefByAddr(RpcEnv.scala:111)
            at 
org.apache.celeborn.common.rpc.RpcEnv.$anonfun$setupEndpointRef$1(RpcEnv.scala:133)
            at 
org.apache.celeborn.common.util.Utils$.withRetryOnTimeoutOrIOException(Utils.scala:1306)
            at 
org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:133)
            at 
org.apache.celeborn.client.ShuffleClientImpl.setupLifecycleManagerRef(ShuffleClientImpl.java:1828)
            ... 12 more
    Caused by: java.net.SocketException: Connection reset
            at 
java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394)
            at 
java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:426)
            at 
org.apache.celeborn.shaded.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:255)
            at 
org.apache.celeborn.shaded.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
            at 
org.apache.celeborn.shaded.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    NA
    
    Closes #3315 from s0nskar/ioexception.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../org/apache/celeborn/common/util/Utils.scala    | 30 +++++++++++++---------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 8e784f790..3fd090e77 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1282,24 +1282,30 @@ object Utils extends Logging {
 
   def withRetryOnTimeoutOrIOException[T](numRetries: Int, retryWait: 
Long)(block: => T): T = {
     var retriesLeft = numRetries
+
+    def waitOrThrow(e: Throwable): Unit = {
+      if (retriesLeft > 0) {
+        val retryWaitMs = new Random().nextInt(retryWait.toInt)
+        try {
+          TimeUnit.MILLISECONDS.sleep(retryWaitMs)
+        } catch {
+          case _: InterruptedException =>
+            throw e
+        }
+      } else {
+        throw e
+      }
+    }
+
     while (retriesLeft >= 0) {
       retriesLeft -= 1
       try {
         return block
       } catch {
+        case e: CelebornException if e.getCause != null && 
e.getCause.isInstanceOf[IOException] =>
+          waitOrThrow(e)
         case e @ (_: RpcTimeoutException | _: IOException) =>
-          if (retriesLeft > 0) {
-            val random = new Random
-            val retryWaitMs = random.nextInt(retryWait.toInt)
-            try {
-              TimeUnit.MILLISECONDS.sleep(retryWaitMs)
-            } catch {
-              case _: InterruptedException =>
-                throw e
-            }
-          } else {
-            throw e
-          }
+          waitOrThrow(e)
         case e: Exception =>
           throw e
       }

Reply via email to