This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 919ece8ad [CELEBORN-2015][FOLLOWUP] Retry IOException failures for RPC
requests
919ece8ad is described below
commit 919ece8ad28b29f7c715f23c721e812207109742
Author: Sanskar Modi <[email protected]>
AuthorDate: Mon Jun 9 11:53:48 2025 -0700
[CELEBORN-2015][FOLLOWUP] Retry IOException failures for RPC requests
### What changes were proposed in this pull request?
Follow up PR for https://github.com/apache/celeborn/pull/3286 – Handling
IOException wrapped inside CelebornException.
### Why are the changes needed?
`org.apache.celeborn.common.util.ThreadUtils#awaitResult` wraps non-timeout
exception into CelebornException because of which it is not getting caught and
retries are not working.
Ex –
```
org.apache.celeborn.common.exception.CelebornRuntimeException:
setupLifecycleManagerRef failed!
at
org.apache.celeborn.client.ShuffleClientImpl.setupLifecycleManagerRef(ShuffleClientImpl.java:1834)
at
org.apache.celeborn.client.ShuffleClient.get(ShuffleClient.java:89)
at
org.apache.spark.shuffle.celeborn.SparkShuffleManager.getWriter(SparkShuffleManager.java:241)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:556)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:559)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.celeborn.common.exception.CelebornException:
Exception thrown in awaitResult:
at
org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:320)
at
org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:78)
at
org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRefByAddr(RpcEnv.scala:111)
at
org.apache.celeborn.common.rpc.RpcEnv.$anonfun$setupEndpointRef$1(RpcEnv.scala:133)
at
org.apache.celeborn.common.util.Utils$.withRetryOnTimeoutOrIOException(Utils.scala:1306)
at
org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:133)
at
org.apache.celeborn.client.ShuffleClientImpl.setupLifecycleManagerRef(ShuffleClientImpl.java:1828)
... 12 more
Caused by: java.net.SocketException: Connection reset
at
java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394)
at
java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:426)
at
org.apache.celeborn.shaded.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:255)
at
org.apache.celeborn.shaded.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at
org.apache.celeborn.shaded.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
at
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
NA
Closes #3315 from s0nskar/ioexception.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../org/apache/celeborn/common/util/Utils.scala | 30 +++++++++++++---------
1 file changed, 18 insertions(+), 12 deletions(-)
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 8e784f790..3fd090e77 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1282,24 +1282,30 @@ object Utils extends Logging {
def withRetryOnTimeoutOrIOException[T](numRetries: Int, retryWait:
Long)(block: => T): T = {
var retriesLeft = numRetries
+
+ def waitOrThrow(e: Throwable): Unit = {
+ if (retriesLeft > 0) {
+ val retryWaitMs = new Random().nextInt(retryWait.toInt)
+ try {
+ TimeUnit.MILLISECONDS.sleep(retryWaitMs)
+ } catch {
+ case _: InterruptedException =>
+ throw e
+ }
+ } else {
+ throw e
+ }
+ }
+
while (retriesLeft >= 0) {
retriesLeft -= 1
try {
return block
} catch {
+ case e: CelebornException if e.getCause != null &&
e.getCause.isInstanceOf[IOException] =>
+ waitOrThrow(e)
case e @ (_: RpcTimeoutException | _: IOException) =>
- if (retriesLeft > 0) {
- val random = new Random
- val retryWaitMs = random.nextInt(retryWait.toInt)
- try {
- TimeUnit.MILLISECONDS.sleep(retryWaitMs)
- } catch {
- case _: InterruptedException =>
- throw e
- }
- } else {
- throw e
- }
+ waitOrThrow(e)
case e: Exception =>
throw e
}