This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4381dd3c7 [CELEBORN-1602] do hard split for push merged data RPC with 
disk full
4381dd3c7 is described below

commit 4381dd3c72f7a7b1721260db461dacf241568b5f
Author: Erik.fang <[email protected]>
AuthorDate: Fri Sep 20 14:42:19 2024 +0800

    [CELEBORN-1602] do hard split for push merged data RPC with disk full
    
    ### What changes were proposed in this pull request?
    
    return hard split for push merged data RPC with disk full
    
    ### Why are the changes needed?
    
    prevent worker disk from writing to 100% capacity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Cluster tested
    
    Closes #2747 from ErikFang/handle-merged-data-disk-full.
    
    Authored-by: Erik.fang <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../apache/celeborn/service/deploy/worker/PushDataHandler.scala  | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 421e5ddfd..c98acaf10 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -533,6 +533,15 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
       callbackWithTimer.onFailure(new CelebornIOException(cause))
       return
     }
+
+    if (fileWriters.exists(checkDiskFull(_) == true)) {
+      val (mapId, attemptId) = getMapAttempt(body)
+      logWarning(
+        s"return hard split for disk full with shuffle $shuffleKey map $mapId 
attempt $attemptId")
+      
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+      return
+    }
+
     fileWriters.foreach(_.incrementPendingWrites())
 
     val closedFileWriter = fileWriters.find(_.isClosed)

Reply via email to