Copilot commented on code in PR #3642:
URL: https://github.com/apache/celeborn/pull/3642#discussion_r3021680385


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -5442,6 +5443,14 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("3s")
 
+  val CLIENT_COMMIT_FILE_REQUEST_RETRY_INTERVAL: ConfigEntry[Long] =
+    buildConf("celeborn.client.requestCommitFiles.retryInterval")
+      .categories("client")
+      .doc("Wait interval before next requestCommitFiles RPC retry.")
+      .version("0.7.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("10s")

Review Comment:
   Config naming is slightly inconsistent with existing retry settings like 
`*.retryWait` (e.g., `registerShuffle.retryWait`, `reserveSlots.retryWait`). If 
this is intended as a fixed delay between attempts, consider aligning the key 
name to `retryWait` for consistency, or clarify in the doc that this is a fixed 
interval (not backoff/jitter) to avoid ambiguity for operators.



##########
client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala:
##########
@@ -512,6 +518,33 @@ abstract class CommitHandler(
           message.primaryIds,
           message.replicaIds)
       }(ec)
+    } else if (isRetry) {
+      val promise = Promise[CommitFilesResponse]()
+      commitRetryScheduler.schedule(
+        new Runnable {
+          override def run(): Unit = {
+            try {
+              worker.endpoint.ask[CommitFilesResponse](message).onComplete {
+                result => promise.complete(result)
+              }(ec)
+            } catch {
+              case e: Exception =>
+                logError(
+                  s"Failed to send CommitFiles to worker($worker) " +
+                    s"for ${message.shuffleId} after retry delay.",
+                  e)
+                promise.success(CommitFilesResponse(
+                  StatusCode.REQUEST_FAILED,
+                  List.empty.asJava,
+                  List.empty.asJava,
+                  message.primaryIds,
+                  message.replicaIds))
+            }
+          }
+        },
+        retryInterval,
+        TimeUnit.MILLISECONDS)

Review Comment:
   If `commitRetryScheduler.schedule(...)` throws (e.g., 
`RejectedExecutionException` after shutdown), the `promise` is never completed 
and callers can hang forever waiting on `promise.future`. Wrap the 
`schedule(...)` call itself in a try/catch and complete the promise with a 
failure/`REQUEST_FAILED` response when scheduling is rejected.
   ```suggestion
         try {
           commitRetryScheduler.schedule(
             new Runnable {
               override def run(): Unit = {
                 try {
                   worker.endpoint.ask[CommitFilesResponse](message).onComplete 
{
                     result => promise.complete(result)
                   }(ec)
                 } catch {
                   case e: Exception =>
                     logError(
                       s"Failed to send CommitFiles to worker($worker) " +
                         s"for ${message.shuffleId} after retry delay.",
                       e)
                     promise.success(CommitFilesResponse(
                       StatusCode.REQUEST_FAILED,
                       List.empty.asJava,
                       List.empty.asJava,
                       message.primaryIds,
                       message.replicaIds))
                 }
               }
             },
             retryInterval,
             TimeUnit.MILLISECONDS)
         } catch {
           case e: Exception =>
             logError(
               s"Failed to schedule CommitFiles retry to worker($worker) " +
                 s"for ${message.shuffleId}.",
               e)
             promise.success(CommitFilesResponse(
               StatusCode.REQUEST_FAILED,
               List.empty.asJava,
               List.empty.asJava,
               message.primaryIds,
               message.replicaIds))
         }
   ```



##########
client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala:
##########
@@ -611,4 +644,8 @@ abstract class CommitHandler(
       fileGroups.remove(partitionId)
     }
   }
+
+  def stop(): Unit = {
+    commitRetryScheduler.shutdownNow()

Review Comment:
   `shutdownNow()` cancels queued scheduled retry tasks. Any in-flight delayed 
retries that already returned a `Future` (backed by a `Promise`) may never run 
and therefore may never complete, which can lead to stuck commit operations 
during/around shutdown. Prefer a graceful shutdown (`shutdown` + bounded await 
termination) or ensure canceled scheduled tasks complete their associated 
promises (e.g., reject new retries up-front and complete pending promises with 
`REQUEST_FAILED`).
   ```suggestion
       commitRetryScheduler.shutdown()
       if (!commitRetryScheduler.awaitTermination(30, TimeUnit.SECONDS)) {
         commitRetryScheduler.shutdownNow()
       }
   ```



##########
client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala:
##########
@@ -80,7 +80,11 @@ abstract class CommitHandler(
 
   val ec = ExecutionContext.fromExecutor(sharedRpcPool)
 
+  private val commitRetryScheduler: ScheduledExecutorService =
+    
ThreadUtils.newDaemonSingleThreadScheduledExecutor("commit-retry-scheduler")

Review Comment:
   A dedicated single-thread scheduler is created per `CommitHandler` instance. 
If the system creates many commit handlers, this can lead to excess thread 
creation and overhead. Consider sharing a scheduler across handlers (e.g., 
owned by `CommitManager` and passed in) or using a shared scheduled executor 
while still avoiding blocking the shared RPC pool.



##########
client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala:
##########
@@ -512,6 +518,33 @@ abstract class CommitHandler(
           message.primaryIds,
           message.replicaIds)
       }(ec)
+    } else if (isRetry) {
+      val promise = Promise[CommitFilesResponse]()
+      commitRetryScheduler.schedule(
+        new Runnable {
+          override def run(): Unit = {
+            try {
+              worker.endpoint.ask[CommitFilesResponse](message).onComplete {

Review Comment:
   The retry path uses `ask(message)` without applying 
`clientRpcCommitFilesAskTimeout`, while the non-retry path uses a bounded 
timeout. This can cause retry RPCs to hang indefinitely (or use a different 
default) and behave differently from initial attempts. Use the same timeout 
variant for the retry attempt to keep behavior consistent and avoid stuck 
commits.
   ```suggestion
                 worker.endpoint.ask[CommitFilesResponse](message, 
clientRpcCommitFilesAskTimeout).onComplete {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to