This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 84f4bd76f [CELEBORN-2283][BUG] Fix missing return in 
Master.handleRequestSlots when all workers are excluded
84f4bd76f is described below

commit 84f4bd76f64c0655de388efbbeda556b36d1d5ba
Author: yew1eb <[email protected]>
AuthorDate: Wed Mar 18 22:20:47 2026 +0800

    [CELEBORN-2283][BUG] Fix missing return in Master.handleRequestSlots when 
all workers are excluded
    
    ### What changes were proposed in this pull request?
      Add a missing `return` statement after `context.reply()` in 
`Master#handleRequestSlots`
      when `numAvailableWorkers == 0`.
    
    ### Why are the changes needed?
      When all workers are excluded, the code replies with `WORKER_EXCLUDED` 
but continues
      executing to `Random.nextInt(numAvailableWorkers)` (i.e. 
`Random.nextInt(0)`), which
      throws `IllegalArgumentException`. This results in a duplicate response 
being sent to
      the client and misleading error logs on the Master side.
    
    ### Does this PR resolve a correctness bug?
     No.
    
    ### Does this PR introduce _any_ user-facing change?
     No.
    
    ### How was this patch tested?
    Existing unit tests.
    
    Closes #3628 from yew1eb/CELEBORN-2283.
    
    Authored-by: yew1eb <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit b4cb5a0b1ac097d33baf8dded1b5be2afd0578a4)
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/service/deploy/master/Master.scala    |  1 +
 .../service/deploy/master/MasterSuite.scala        | 45 +++++++++++++++++++++-
 2 files changed, 45 insertions(+), 1 deletion(-)

diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index ad949eaba..ef5c23e66 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -934,6 +934,7 @@ private[celeborn] class Master(
       logError(s"Offer slots for $shuffleKey failed due to all workers are 
excluded!")
       context.reply(
         RequestSlotsResponse(StatusCode.WORKER_EXCLUDED, new WorkerResource(), 
requestSlots.packed))
+      return
     }
 
     val numWorkers = Math.min(
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
index 7b7a7a1f6..0a8a7b592 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
@@ -18,13 +18,19 @@
 package org.apache.celeborn.service.deploy.master
 
 import java.nio.file.Files
+import java.util
 
-import org.mockito.Mockito.mock
+import org.mockito.ArgumentCaptor
+import org.mockito.Mockito.{mock, verify}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.protocol.{PbCheckForWorkerTimeout, 
PbRegisterWorker}
+import 
org.apache.celeborn.common.protocol.message.ControlMessages.{RequestSlots, 
RequestSlotsResponse}
+import org.apache.celeborn.common.protocol.message.StatusCode
+import org.apache.celeborn.common.rpc.RpcCallContext
 import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils}
 
 class MasterSuite extends AnyFunSuite
@@ -154,4 +160,41 @@ class MasterSuite extends AnyFunSuite
     assert(master.workerHostAllowedToRegister("deny.k8s.io"))
     master.rpcEnv.shutdown()
   }
+
+  test("handleRequestSlots replies WORKER_EXCLUDED and does not throw when no 
workers available") {
+    val conf = new CelebornConf()
+    val randomMasterPort = selectRandomPort()
+    val randomHttpPort = selectRandomPort()
+    conf.set(CelebornConf.HA_ENABLED.key, "false")
+    conf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
+    conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
+
+    val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
+    val masterArgs = new MasterArguments(args, conf)
+    val master = new Master(conf, masterArgs)
+
+    // No workers registered — availableWorkers is empty
+    val requestSlots = RequestSlots(
+      "app1",
+      0,
+      new util.ArrayList[Integer](),
+      "localhost",
+      shouldReplicate = false,
+      shouldRackAware = false,
+      new UserIdentifier("tenant", "user"),
+      maxWorkers = 0,
+      availableStorageTypes = 0)
+
+    val context = mock(classOf[RpcCallContext])
+    val captor = ArgumentCaptor.forClass(classOf[Any])
+
+    // Should not throw IllegalArgumentException
+    master.handleRequestSlots(context, requestSlots)
+
+    verify(context).reply(captor.capture())
+    val response = captor.getValue.asInstanceOf[RequestSlotsResponse]
+    assert(response.status === StatusCode.WORKER_EXCLUDED)
+
+    master.rpcEnv.shutdown()
+  }
 }

Reply via email to