This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7b25797fb [CELEBORN-2274] Fix replicate channels not resumed when
transitioning from PUSH_AND_REPLICATE_PAUSED to PUSH_PAUSED
7b25797fb is described below
commit 7b25797fb8cf08c3495419b1d872663b08f829fb
Author: Shuai Lu <[email protected]>
AuthorDate: Wed Mar 18 22:18:21 2026 +0800
[CELEBORN-2274] Fix replicate channels not resumed when transitioning from
PUSH_AND_REPLICATE_PAUSED to PUSH_PAUSED
### What changes were proposed in this pull request?
Fix a bug in `MemoryManager.switchServingState()` where replicate channels
permanently lose `autoRead=true` after a memory pressure event.
When the serving state transitions from `PUSH_AND_REPLICATE_PAUSED` to
`PUSH_PAUSED`, `resumeReplicate()` was only called inside the
`!tryResumeByPinnedMemory()` guard. If `tryResumeByPinnedMemory()` returned
`true`, the entire block was skipped and replicate channels were never resumed.
The fix moves `resumeReplicate()` outside the `tryResumeByPinnedMemory()`
guard so it is always called when stepping down from
`PUSH_AND_REPLICATE_PAUSED` to `PUSH_PAUSED`. This is a state machine
invariant: `PUSH_PAUSED` means only push is paused; replicate must always be
resumed.
### Why are the changes needed?
Once replicate channels are stuck with `autoRead=false`, Netty I/O threads
stop reading from all replicate connections. Remote workers writing to the
affected worker see their TCP send buffers fill up (zero window), causing
pending writes to accumulate in `ChannelOutboundBuffer`. Each pending write
holds a reference to a direct memory `ByteBuf`, causing direct memory to grow
indefinitely on the remote workers.
The failure sequence:
1. Worker hits memory pressure → state = `PUSH_AND_REPLICATE_PAUSED` → all
channels paused
2. Pinned memory is low → `tryResumeByPinnedMemory()` returns `true` →
`resumeByPinnedMemory(PUSH_PAUSED)` resumes push only, replicate not resumed
3. Memory drops to push-only range → state = `PUSH_PAUSED`, but
`resumeReplicate()` is never called
4. Replicate channels permanently stuck with `autoRead=false`, causing
unbounded direct memory growth on remote workers
### Does this PR resolve a correctness bug?
Yes.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a new unit test `Test MemoryManager resume replicate by pinned
memory` in `MemoryManagerSuite` that reproduces the exact failure scenario:
1. Enter `PUSH_AND_REPLICATE_PAUSED` with low pinned memory (channels
resumed by pinned memory path)
2. Raise pinned memory so both push and replicate get paused
3. Drop memory to `PUSH_PAUSED` range with low pinned memory
4. Assert replicate listener is resumed — this assertion fails without the
fix
Closes #3616 from sl3635/CELEBORN-2274.
Authored-by: Shuai Lu <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../deploy/worker/memory/MemoryManager.java | 4 +-
.../service/deploy/memory/MemoryManagerSuite.scala | 48 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index dde5793c5..1e3540e08 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -335,11 +335,13 @@ public class MemoryManager {
}
switch (servingState) {
case PUSH_PAUSED:
+ if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
+ resumeReplicate();
+ }
if (!tryResumeByPinnedMemory(servingState, lastState)) {
pausePushDataCounter.increment();
if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
appendPauseSpentTime(lastState);
- resumeReplicate();
} else {
if (servingState != lastState) {
pausePushDataStartTime = System.currentTimeMillis();
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
index 2a9f8fa09..6362273d1 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -298,6 +298,54 @@ class MemoryManagerSuite extends CelebornFunSuite {
}
+ test("Test MemoryManager resume replicate by pinned memory") {
+ val conf = new CelebornConf()
+ conf.set(CelebornConf.WORKER_DIRECT_MEMORY_CHECK_INTERVAL.key, "300s")
+ conf.set(CelebornConf.WORKER_PINNED_MEMORY_CHECK_INTERVAL.key, "0")
+ MemoryManager.reset()
+ val memoryManager = MockitoSugar.spy(MemoryManager.initialize(conf))
+ val maxDirectorMemory = memoryManager.maxDirectMemory
+ val pushThreshold =
+ (conf.workerDirectMemoryRatioToPauseReceive *
maxDirectorMemory).longValue()
+ val replicateThreshold =
+ (conf.workerDirectMemoryRatioToPauseReplicate *
maxDirectorMemory).longValue()
+ val pinnedMemoryResumeThreshold =
+ (conf.workerPinnedMemoryRatioToResume * maxDirectorMemory).longValue()
+
+ val pushListener = new
MockMemoryPressureListener(TransportModuleConstants.PUSH_MODULE)
+ val replicateListener =
+ new MockMemoryPressureListener(TransportModuleConstants.REPLICATE_MODULE)
+ memoryManager.registerMemoryListener(pushListener)
+ memoryManager.registerMemoryListener(replicateListener)
+
+ // NONE_PAUSED -> PUSH_AND_REPLICATE_PAUSED, resumed by low pinned memory
+ Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(0L)
+ Mockito.when(memoryManager.getMemoryUsage).thenReturn(replicateThreshold +
1)
+ memoryManager.switchServingState()
+ assert(!pushListener.isPause)
+ assert(!replicateListener.isPause)
+ assert(memoryManager.servingState ==
ServingState.PUSH_AND_REPLICATE_PAUSED)
+
+ // KEEP PUSH_AND_REPLICATE_PAUSED, pinned memory rises, both get paused
+ Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(
+ pinnedMemoryResumeThreshold + 1)
+ memoryManager.switchServingState()
+ assert(pushListener.isPause)
+ assert(replicateListener.isPause)
+ assert(memoryManager.servingState ==
ServingState.PUSH_AND_REPLICATE_PAUSED)
+
+ // PUSH_AND_REPLICATE_PAUSED -> PUSH_PAUSED, pinned memory low again
+ // replicate must be resumed regardless of tryResumeByPinnedMemory
+ Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(0L)
+ Mockito.when(memoryManager.getMemoryUsage).thenReturn(pushThreshold + 1)
+ memoryManager.switchServingState()
+ assert(!pushListener.isPause)
+ assert(!replicateListener.isPause)
+ assert(memoryManager.servingState == ServingState.PUSH_PAUSED)
+
+ MemoryManager.reset()
+ }
+
class MockMemoryPressureListener(
val belongModuleName: String,
var isPause: Boolean = false) extends MemoryPressureListener {