This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 84795bc63 [CELEBORN-382] Call checkDiskFullAndSplit in the
handlePushData method to avoid repeated definitions (#1313)
84795bc63 is described below
commit 84795bc63bce06a2d5c10269cf3e3d57086538f3
Author: jiaoqingbo <[email protected]>
AuthorDate: Tue Mar 7 18:55:46 2023 +0800
[CELEBORN-382] Call checkDiskFullAndSplit in the handlePushData method to
avoid repeated definitions (#1313)
---
.../service/deploy/worker/PushDataHandler.scala | 20 +++-----------------
1 file changed, 3 insertions(+), 17 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 273cce6e9..56d21beb9 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -224,23 +224,9 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
callbackWithTimer.onFailure(new CelebornIOException(cause))
return
}
- val diskFull =
- if (fileWriter.flusher.isInstanceOf[LocalFlusher]) {
- workerInfo.diskInfos
- .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
- .actualUsableSpace < diskReserveSize
- } else {
- false
- }
- if ((diskFull && fileWriter.getFileInfo.getFileLength >
partitionSplitMinimumSize) ||
- (isMaster && fileWriter.getFileInfo.getFileLength >
fileWriter.getSplitThreshold())) {
- if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
- softSplit.set(true)
- } else {
-
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
- return
- }
- }
+
+ if (checkDiskFullAndSplit(fileWriter, isMaster, softSplit,
callbackWithTimer)) return
+
fileWriter.incrementPendingWrites()
// for master, send data to slave