This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new db43de843 [CELEBORN-2274] Fix replicate channels not resumed when 
transitioning from PUSH_AND_REPLICATE_PAUSED to PUSH_PAUSED
db43de843 is described below

commit db43de843e6583c011ea0cbb0016de2403b13147
Author: Shuai Lu <[email protected]>
AuthorDate: Wed Mar 18 22:18:21 2026 +0800

    [CELEBORN-2274] Fix replicate channels not resumed when transitioning from 
PUSH_AND_REPLICATE_PAUSED to PUSH_PAUSED
    
    ### What changes were proposed in this pull request?
    
    Fix a bug in `MemoryManager.switchServingState()` where replicate channels 
permanently lose `autoRead=true` after a memory pressure event.
    
    When the serving state transitions from `PUSH_AND_REPLICATE_PAUSED` to 
`PUSH_PAUSED`, `resumeReplicate()` was only called inside the 
`!tryResumeByPinnedMemory()` guard. If `tryResumeByPinnedMemory()` returned 
`true`, the entire block was skipped and replicate channels were never resumed.
    
    The fix moves `resumeReplicate()` outside the `tryResumeByPinnedMemory()` 
guard so it is always called when stepping down from 
`PUSH_AND_REPLICATE_PAUSED` to `PUSH_PAUSED`. This is a state machine 
invariant: `PUSH_PAUSED` means only push is paused; replicate must always be 
resumed.
    
    ### Why are the changes needed?
    
    Once replicate channels are stuck with `autoRead=false`, Netty I/O threads 
stop reading from all replicate connections. Remote workers writing to the 
affected worker see their TCP send buffers fill up (zero window), causing 
pending writes to accumulate in `ChannelOutboundBuffer`. Each pending write 
holds a reference to a direct memory `ByteBuf`, causing direct memory to grow 
indefinitely on the remote workers.
    
    The failure sequence:
    1. Worker hits memory pressure → state = `PUSH_AND_REPLICATE_PAUSED` → all 
channels paused
    2. Pinned memory is low → `tryResumeByPinnedMemory()` returns `true` → 
`resumeByPinnedMemory(PUSH_PAUSED)` resumes push only, replicate not resumed
    3. Memory drops to push-only range → state = `PUSH_PAUSED`, but 
`resumeReplicate()` is never called
    4. Replicate channels permanently stuck with `autoRead=false`, causing 
unbounded direct memory growth on remote workers
    
    ### Does this PR resolve a correctness bug?
    
    Yes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added a new unit test `Test MemoryManager resume replicate by pinned 
memory` in `MemoryManagerSuite` that reproduces the exact failure scenario:
    1. Enter `PUSH_AND_REPLICATE_PAUSED` with low pinned memory (channels 
resumed by pinned memory path)
    2. Raise pinned memory so both push and replicate get paused
    3. Drop memory to `PUSH_PAUSED` range with low pinned memory
    4. Assert replicate listener is resumed — this assertion fails without the 
fix
    
    Closes #3616 from sl3635/CELEBORN-2274.
    
    Authored-by: Shuai Lu <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 7b25797fb8cf08c3495419b1d872663b08f829fb)
    Signed-off-by: Shuang <[email protected]>
---
 .../deploy/worker/memory/MemoryManager.java        |  4 +-
 .../service/deploy/memory/MemoryManagerSuite.scala | 48 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index a11b1b543..f066cc6fa 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -335,11 +335,13 @@ public class MemoryManager {
     }
     switch (servingState) {
       case PUSH_PAUSED:
+        if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
+          resumeReplicate();
+        }
         if (!tryResumeByPinnedMemory(servingState, lastState)) {
           pausePushDataCounter.increment();
           if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
             appendPauseSpentTime(lastState);
-            resumeReplicate();
           } else {
             if (servingState != lastState) {
               pausePushDataStartTime = System.currentTimeMillis();
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
index 2a9f8fa09..6362273d1 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -298,6 +298,54 @@ class MemoryManagerSuite extends CelebornFunSuite {
 
   }
 
+  test("Test MemoryManager resume replicate by pinned memory") {
+    val conf = new CelebornConf()
+    conf.set(CelebornConf.WORKER_DIRECT_MEMORY_CHECK_INTERVAL.key, "300s")
+    conf.set(CelebornConf.WORKER_PINNED_MEMORY_CHECK_INTERVAL.key, "0")
+    MemoryManager.reset()
+    val memoryManager = MockitoSugar.spy(MemoryManager.initialize(conf))
+    val maxDirectorMemory = memoryManager.maxDirectMemory
+    val pushThreshold =
+      (conf.workerDirectMemoryRatioToPauseReceive * 
maxDirectorMemory).longValue()
+    val replicateThreshold =
+      (conf.workerDirectMemoryRatioToPauseReplicate * 
maxDirectorMemory).longValue()
+    val pinnedMemoryResumeThreshold =
+      (conf.workerPinnedMemoryRatioToResume * maxDirectorMemory).longValue()
+
+    val pushListener = new 
MockMemoryPressureListener(TransportModuleConstants.PUSH_MODULE)
+    val replicateListener =
+      new MockMemoryPressureListener(TransportModuleConstants.REPLICATE_MODULE)
+    memoryManager.registerMemoryListener(pushListener)
+    memoryManager.registerMemoryListener(replicateListener)
+
+    // NONE_PAUSED -> PUSH_AND_REPLICATE_PAUSED, resumed by low pinned memory
+    Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(0L)
+    Mockito.when(memoryManager.getMemoryUsage).thenReturn(replicateThreshold + 
1)
+    memoryManager.switchServingState()
+    assert(!pushListener.isPause)
+    assert(!replicateListener.isPause)
+    assert(memoryManager.servingState == 
ServingState.PUSH_AND_REPLICATE_PAUSED)
+
+    // KEEP PUSH_AND_REPLICATE_PAUSED, pinned memory rises, both get paused
+    Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(
+      pinnedMemoryResumeThreshold + 1)
+    memoryManager.switchServingState()
+    assert(pushListener.isPause)
+    assert(replicateListener.isPause)
+    assert(memoryManager.servingState == 
ServingState.PUSH_AND_REPLICATE_PAUSED)
+
+    // PUSH_AND_REPLICATE_PAUSED -> PUSH_PAUSED, pinned memory low again
+    // replicate must be resumed regardless of tryResumeByPinnedMemory
+    Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(0L)
+    Mockito.when(memoryManager.getMemoryUsage).thenReturn(pushThreshold + 1)
+    memoryManager.switchServingState()
+    assert(!pushListener.isPause)
+    assert(!replicateListener.isPause)
+    assert(memoryManager.servingState == ServingState.PUSH_PAUSED)
+
+    MemoryManager.reset()
+  }
+
   class MockMemoryPressureListener(
       val belongModuleName: String,
       var isPause: Boolean = false) extends MemoryPressureListener {

Reply via email to