This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 84795bc63 [CELEBORN-382] Call checkDiskFullAndSplit in the 
handlePushData method to avoid repeated definitions (#1313)
84795bc63 is described below

commit 84795bc63bce06a2d5c10269cf3e3d57086538f3
Author: jiaoqingbo <[email protected]>
AuthorDate: Tue Mar 7 18:55:46 2023 +0800

    [CELEBORN-382] Call checkDiskFullAndSplit in the handlePushData method to 
avoid repeated definitions (#1313)
---
 .../service/deploy/worker/PushDataHandler.scala      | 20 +++-----------------
 1 file changed, 3 insertions(+), 17 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 273cce6e9..56d21beb9 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -224,23 +224,9 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       callbackWithTimer.onFailure(new CelebornIOException(cause))
       return
     }
-    val diskFull =
-      if (fileWriter.flusher.isInstanceOf[LocalFlusher]) {
-        workerInfo.diskInfos
-          .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
-          .actualUsableSpace < diskReserveSize
-      } else {
-        false
-      }
-    if ((diskFull && fileWriter.getFileInfo.getFileLength > 
partitionSplitMinimumSize) ||
-      (isMaster && fileWriter.getFileInfo.getFileLength > 
fileWriter.getSplitThreshold())) {
-      if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
-        softSplit.set(true)
-      } else {
-        
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
-        return
-      }
-    }
+
+    if (checkDiskFullAndSplit(fileWriter, isMaster, softSplit, 
callbackWithTimer)) return
+
     fileWriter.incrementPendingWrites()
 
     // for master, send data to slave

Reply via email to