Copilot commented on code in PR #3642:
URL: https://github.com/apache/celeborn/pull/3642#discussion_r3021680385
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -5442,6 +5443,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
+ val CLIENT_COMMIT_FILE_REQUEST_RETRY_INTERVAL: ConfigEntry[Long] =
+ buildConf("celeborn.client.requestCommitFiles.retryInterval")
+ .categories("client")
+ .doc("Wait interval before next requestCommitFiles RPC retry.")
+ .version("0.7.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("10s")
Review Comment:
Config naming is slightly inconsistent with existing retry settings like
`*.retryWait` (e.g., `registerShuffle.retryWait`, `reserveSlots.retryWait`). If
this is intended as a fixed delay between attempts, consider aligning the key
name to `retryWait` for consistency, or clarify in the doc that this is a fixed
interval (not backoff/jitter) to avoid ambiguity for operators.
##########
client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala:
##########
@@ -512,6 +518,33 @@ abstract class CommitHandler(
message.primaryIds,
message.replicaIds)
}(ec)
+ } else if (isRetry) {
+ val promise = Promise[CommitFilesResponse]()
+ commitRetryScheduler.schedule(
+ new Runnable {
+ override def run(): Unit = {
+ try {
+ worker.endpoint.ask[CommitFilesResponse](message).onComplete {
+ result => promise.complete(result)
+ }(ec)
+ } catch {
+ case e: Exception =>
+ logError(
+ s"Failed to send CommitFiles to worker($worker) " +
+ s"for ${message.shuffleId} after retry delay.",
+ e)
+ promise.success(CommitFilesResponse(
+ StatusCode.REQUEST_FAILED,
+ List.empty.asJava,
+ List.empty.asJava,
+ message.primaryIds,
+ message.replicaIds))
+ }
+ }
+ },
+ retryInterval,
+ TimeUnit.MILLISECONDS)
Review Comment:
If `commitRetryScheduler.schedule(...)` throws (e.g.,
`RejectedExecutionException` after shutdown), the `promise` is never completed
and callers can hang forever waiting on `promise.future`. Wrap the
`schedule(...)` call itself in a try/catch and complete the promise with a
failure/`REQUEST_FAILED` response when scheduling is rejected.
```suggestion
try {
commitRetryScheduler.schedule(
new Runnable {
override def run(): Unit = {
try {
worker.endpoint.ask[CommitFilesResponse](message).onComplete
{
result => promise.complete(result)
}(ec)
} catch {
case e: Exception =>
logError(
s"Failed to send CommitFiles to worker($worker) " +
s"for ${message.shuffleId} after retry delay.",
e)
promise.success(CommitFilesResponse(
StatusCode.REQUEST_FAILED,
List.empty.asJava,
List.empty.asJava,
message.primaryIds,
message.replicaIds))
}
}
},
retryInterval,
TimeUnit.MILLISECONDS)
} catch {
case e: Exception =>
logError(
s"Failed to schedule CommitFiles retry to worker($worker) " +
s"for ${message.shuffleId}.",
e)
promise.success(CommitFilesResponse(
StatusCode.REQUEST_FAILED,
List.empty.asJava,
List.empty.asJava,
message.primaryIds,
message.replicaIds))
}
```
##########
client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala:
##########
@@ -611,4 +644,8 @@ abstract class CommitHandler(
fileGroups.remove(partitionId)
}
}
+
+ def stop(): Unit = {
+ commitRetryScheduler.shutdownNow()
Review Comment:
`shutdownNow()` cancels queued scheduled retry tasks. Any in-flight delayed
retries that already returned a `Future` (backed by a `Promise`) may never run
and therefore may never complete, which can lead to stuck commit operations
during/around shutdown. Prefer a graceful shutdown (`shutdown` + bounded await
termination) or ensure canceled scheduled tasks complete their associated
promises (e.g., reject new retries up-front and complete pending promises with
`REQUEST_FAILED`).
```suggestion
commitRetryScheduler.shutdown()
if (!commitRetryScheduler.awaitTermination(30, TimeUnit.SECONDS)) {
commitRetryScheduler.shutdownNow()
}
```
##########
client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala:
##########
@@ -80,7 +80,11 @@ abstract class CommitHandler(
val ec = ExecutionContext.fromExecutor(sharedRpcPool)
+ private val commitRetryScheduler: ScheduledExecutorService =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("commit-retry-scheduler")
Review Comment:
A dedicated single-thread scheduler is created per `CommitHandler` instance.
If the system creates many commit handlers, this can lead to excess thread
creation and overhead. Consider sharing a scheduler across handlers (e.g.,
owned by `CommitManager` and passed in) or using a shared scheduled executor
while still avoiding blocking the shared RPC pool.
##########
client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala:
##########
@@ -512,6 +518,33 @@ abstract class CommitHandler(
message.primaryIds,
message.replicaIds)
}(ec)
+ } else if (isRetry) {
+ val promise = Promise[CommitFilesResponse]()
+ commitRetryScheduler.schedule(
+ new Runnable {
+ override def run(): Unit = {
+ try {
+ worker.endpoint.ask[CommitFilesResponse](message).onComplete {
Review Comment:
The retry path uses `ask(message)` without applying
`clientRpcCommitFilesAskTimeout`, while the non-retry path uses a bounded
timeout. This can cause retry RPCs to hang indefinitely (or use a different
default) and behave differently from initial attempts. Use the same timeout
variant for the retry attempt to keep behavior consistent and avoid stuck
commits.
```suggestion
worker.endpoint.ask[CommitFilesResponse](message,
clientRpcCommitFilesAskTimeout).onComplete {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]