This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 7b5dc530b [CELEBORN-2092] Inc COMMIT_FILES_FAIL_COUNT when
TimerWriter::close timeout
7b5dc530b is described below
commit 7b5dc530b155953b4b0ad1d036ce61d1d17a3e36
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Jul 31 21:12:21 2025 -0700
[CELEBORN-2092] Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout
### What changes were proposed in this pull request?
Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout
### Why are the changes needed?
1. the COMMIT_FILES_FAIL_COUNT is 0 even we meet SHUFFLE_DATA_LOST caused
by commit files failure
Spark executor log:
```
25/07/30 10:10:39 WARN CelebornShuffleReader: Handle fetch exceptions for
0-0org.apache.celeborn.common.exception.CelebornIOException: Failed to load
file group of shuffle 0 partition 441! Request GetReducerFileGroup(0,false,V1)
return SHUFFLE_DATA_LOST for 0.
```
Spark driver log:
```
25/07/30 10:10:38 ERROR ReducePartitionCommitHandler: Failed to handle
stageEnd for 0, lost file!
25/07/30 10:10:38 ERROR ReducePartitionCommitHandler:
For shuffle application_1750652300305_10219240_1-0 partition data lost:
Lost partition 307-0 in worker
[Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
Lost partition 1289-0 in worker
[Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
```
Worker log:
```
java.io.IOException: Wait pending actions timeout.
at
org.apache.celeborn.service.deploy.worker.storage.TierWriterBase.waitOnNoPending(TierWriter.scala:158)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Closes #3403 from turboFei/commit_failed.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 604485779ccadfa29dcd7994fe949e6bf0db5255)
Signed-off-by: Wang, Fei <[email protected]>
---
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala | 2 +-
.../scala/org/apache/celeborn/service/deploy/worker/Controller.scala | 1 +
.../org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala | 2 +-
3 files changed, 3 insertions(+), 2 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index b339c75e6..be93a00c7 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1136,7 +1136,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
logDebug(s"Succeed $message")
context.reply(MapperEndResponse(StatusCode.SUCCESS))
case false =>
- logError(s"Failed $message")
+ logError(s"Failed $message, reply ${StatusCode.SHUFFLE_DATA_LOST}.")
context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST))
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index f01792729..96a18a76a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -362,6 +362,7 @@ private[deploy] class Controller(
} catch {
case e: IOException =>
logError(s"Commit file for $shuffleKey $uniqueId failed.", e)
+ workerSource.incCounter(WorkerSource.COMMIT_FILES_FAIL_COUNT)
failedIds.add(uniqueId)
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 7c3f1b966..29865ba39 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -155,7 +155,7 @@ abstract class TierWriterBase(
waitTime -= WAIT_INTERVAL_MS
}
if (counter.get > 0 && failWhenTimeout) {
- val ioe = new IOException("Wait pending actions timeout.")
+ val ioe = new IOException(s"Wait pending actions timeout after
$writerCloseTimeoutMs ms.")
notifier.setException(ioe)
throw ioe
}