This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 4317657c7 [CELEBORN-2044] Proactively cleanup stream state from
ChunkStreamManager when the stream ends
4317657c7 is described below
commit 4317657c7bc8f2ec83a8742b218efc1e5028f3c3
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Tue Jun 24 12:51:00 2025 -0500
[CELEBORN-2044] Proactively cleanup stream state from ChunkStreamManager
when the stream ends
### What changes were proposed in this pull request?
Proactively cleanup from ChunkStreamManager when stream is closed.
### Why are the changes needed?
Stream gets closed only when shuffle expires at master, which can take a
while.
In meantime, workers incur have nontrivial memory utilization.
### Does this PR introduce _any_ user-facing change?
No. Reduces memory usage in workers.
### How was this patch tested?
Existing unit tests
Closes #3343 from mridulm/proactively-cleanup-streams.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 8ae97376010c191293d3dd5ae5f39aba5692eb57)
Signed-off-by: Wang, Fei <[email protected]>
---
.../celeborn/service/deploy/worker/storage/ChunkStreamManager.java | 4 ++++
.../org/apache/celeborn/service/deploy/worker/FetchHandler.scala | 2 +-
2 files changed, 5 insertions(+), 1 deletion(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
index 91852ab0c..86ecfbe6a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
@@ -196,6 +196,10 @@ public class ChunkStreamManager {
return streams.get(streamId);
}
+ public StreamState removeStreamState(long streamId) {
+ return streams.remove(streamId);
+ }
+
public int getStreamsCount() {
return streams.size();
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index aa60796c8..edf71039f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -485,7 +485,7 @@ class FetchHandler(
streamType: StreamType): Unit = {
streamType match {
case StreamType.ChunkStream =>
- val streamState = chunkStreamManager.getStreamState(streamId)
+ val streamState = chunkStreamManager.removeStreamState(streamId)
if (streamState != null) {
val (shuffleKey, fileName) = (streamState.shuffleKey,
streamState.fileName)
workerSource.recordAppActiveConnection(client, shuffleKey)