This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ae973760 [CELEBORN-2044] Proactively cleanup stream state from 
ChunkStreamManager when the stream ends
8ae973760 is described below

commit 8ae97376010c191293d3dd5ae5f39aba5692eb57
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Tue Jun 24 12:51:00 2025 -0500

    [CELEBORN-2044] Proactively cleanup stream state from ChunkStreamManager 
when the stream ends
    
    ### What changes were proposed in this pull request?
    Proactively cleanup from ChunkStreamManager when stream is closed.
    
    ### Why are the changes needed?
    
    Stream gets closed only when shuffle expires at master, which can take a 
while.
    In meantime, workers incur have nontrivial memory utilization.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Reduces memory usage in workers.
    
    ### How was this patch tested?
    
    Existing unit tests
    
    Closes #3343 from mridulm/proactively-cleanup-streams.
    
    Authored-by: Mridul Muralidharan <mridulatgmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../celeborn/service/deploy/worker/storage/ChunkStreamManager.java    | 4 ++++
 .../org/apache/celeborn/service/deploy/worker/FetchHandler.scala      | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
index 91852ab0c..86ecfbe6a 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
@@ -196,6 +196,10 @@ public class ChunkStreamManager {
     return streams.get(streamId);
   }
 
+  public StreamState removeStreamState(long streamId) {
+    return streams.remove(streamId);
+  }
+
   public int getStreamsCount() {
     return streams.size();
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index a943de58a..6ef8bdb2c 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -478,7 +478,7 @@ class FetchHandler(
       streamType: StreamType): Unit = {
     streamType match {
       case StreamType.ChunkStream =>
-        val streamState = chunkStreamManager.getStreamState(streamId)
+        val streamState = chunkStreamManager.removeStreamState(streamId)
         if (streamState != null) {
           val (shuffleKey, fileName) = (streamState.shuffleKey, 
streamState.fileName)
           workerSource.recordAppActiveConnection(client, shuffleKey)

Reply via email to