This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 84f4bd76f [CELEBORN-2283][BUG] Fix missing return in
Master.handleRequestSlots when all workers are excluded
84f4bd76f is described below
commit 84f4bd76f64c0655de388efbbeda556b36d1d5ba
Author: yew1eb <[email protected]>
AuthorDate: Wed Mar 18 22:20:47 2026 +0800
[CELEBORN-2283][BUG] Fix missing return in Master.handleRequestSlots when
all workers are excluded
### What changes were proposed in this pull request?
Add a missing `return` statement after `context.reply()` in
`Master#handleRequestSlots`
when `numAvailableWorkers == 0`.
### Why are the changes needed?
When all workers are excluded, the code replies with `WORKER_EXCLUDED`
but continues
executing to `Random.nextInt(numAvailableWorkers)` (i.e.
`Random.nextInt(0)`), which
throws `IllegalArgumentException`. This results in a duplicate response
being sent to
the client and misleading error logs on the Master side.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests.
Closes #3628 from yew1eb/CELEBORN-2283.
Authored-by: yew1eb <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit b4cb5a0b1ac097d33baf8dded1b5be2afd0578a4)
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/service/deploy/master/Master.scala | 1 +
.../service/deploy/master/MasterSuite.scala | 45 +++++++++++++++++++++-
2 files changed, 45 insertions(+), 1 deletion(-)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index ad949eaba..ef5c23e66 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -934,6 +934,7 @@ private[celeborn] class Master(
logError(s"Offer slots for $shuffleKey failed due to all workers are
excluded!")
context.reply(
RequestSlotsResponse(StatusCode.WORKER_EXCLUDED, new WorkerResource(),
requestSlots.packed))
+ return
}
val numWorkers = Math.min(
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
index 7b7a7a1f6..0a8a7b592 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
@@ -18,13 +18,19 @@
package org.apache.celeborn.service.deploy.master
import java.nio.file.Files
+import java.util
-import org.mockito.Mockito.mock
+import org.mockito.ArgumentCaptor
+import org.mockito.Mockito.{mock, verify}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.protocol.{PbCheckForWorkerTimeout,
PbRegisterWorker}
+import
org.apache.celeborn.common.protocol.message.ControlMessages.{RequestSlots,
RequestSlotsResponse}
+import org.apache.celeborn.common.protocol.message.StatusCode
+import org.apache.celeborn.common.rpc.RpcCallContext
import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils}
class MasterSuite extends AnyFunSuite
@@ -154,4 +160,41 @@ class MasterSuite extends AnyFunSuite
assert(master.workerHostAllowedToRegister("deny.k8s.io"))
master.rpcEnv.shutdown()
}
+
+ test("handleRequestSlots replies WORKER_EXCLUDED and does not throw when no
workers available") {
+ val conf = new CelebornConf()
+ val randomMasterPort = selectRandomPort()
+ val randomHttpPort = selectRandomPort()
+ conf.set(CelebornConf.HA_ENABLED.key, "false")
+ conf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
+ conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
+
+ val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
+ val masterArgs = new MasterArguments(args, conf)
+ val master = new Master(conf, masterArgs)
+
+ // No workers registered — availableWorkers is empty
+ val requestSlots = RequestSlots(
+ "app1",
+ 0,
+ new util.ArrayList[Integer](),
+ "localhost",
+ shouldReplicate = false,
+ shouldRackAware = false,
+ new UserIdentifier("tenant", "user"),
+ maxWorkers = 0,
+ availableStorageTypes = 0)
+
+ val context = mock(classOf[RpcCallContext])
+ val captor = ArgumentCaptor.forClass(classOf[Any])
+
+ // Should not throw IllegalArgumentException
+ master.handleRequestSlots(context, requestSlots)
+
+ verify(context).reply(captor.capture())
+ val response = captor.getValue.asInstanceOf[RequestSlotsResponse]
+ assert(response.status === StatusCode.WORKER_EXCLUDED)
+
+ master.rpcEnv.shutdown()
+ }
}