This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f0af3456 [CELEBORN-1564] Fix actualUsableSpace of offerSlotsLoadAware 
condition on diskInfo
9f0af3456 is described below

commit 9f0af3456a321d2b49909232eef87ca776fab1f6
Author: szt <[email protected]>
AuthorDate: Mon Aug 26 14:17:55 2024 +0800

    [CELEBORN-1564] Fix actualUsableSpace of offerSlotsLoadAware condition on 
diskInfo
    
    ### What changes were proposed in this pull request?
    fix offerSlotsLoadAware's actualUsableSpace condition on diskInfo,
    considering diskReserveSize when updateDiskInfos in StorageManager,
    so master don't need to calculate usableSpace when offerSlots.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #2688 from zaynt4606/main.
    
    Authored-by: szt <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../service/deploy/master/SlotsAllocator.java      | 13 +----
 .../celeborn/service/deploy/master/Master.scala    |  2 -
 .../deploy/master/SlotsAllocatorSuiteJ.java        |  5 --
 .../tests/spark/CelebornHashCheckDiskSuite.scala   |  7 +--
 .../deploy/worker/storage/StorageManager.scala     | 29 ++++++++---
 .../worker/storage/StorageManagerSuite.scala       | 56 +++++++++++++++++++++-
 6 files changed, 82 insertions(+), 30 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index eda5a3a2a..caf11f712 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -21,8 +21,6 @@ import java.util.*;
 import java.util.function.IntUnaryOperator;
 import java.util.stream.Collectors;
 
-import scala.Double;
-import scala.Option;
 import scala.Tuple2;
 
 import org.apache.commons.lang3.StringUtils;
@@ -35,7 +33,6 @@ import org.apache.celeborn.common.meta.DiskStatus;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.common.protocol.StorageInfo;
-import org.apache.celeborn.common.util.DiskUtils;
 
 public class SlotsAllocator {
   static class UsableDiskInfo {
@@ -112,8 +109,6 @@ public class SlotsAllocator {
           List<Integer> partitionIds,
           boolean shouldReplicate,
           boolean shouldRackAware,
-          long diskReserveSize,
-          Option<Double> diskReserveRatio,
           int diskGroupCount,
           double diskGroupGradient,
           double flushTimeWeight,
@@ -143,13 +138,7 @@ public class SlotsAllocator {
                 .forEach(
                     (key, diskInfo) -> {
                       diskToWorkerMap.put(diskInfo, i);
-                      if (diskInfo.actualUsableSpace()
-                              > DiskUtils.getMinimumUsableSize(
-                                  diskInfo,
-                                  diskReserveSize,
-                                  diskReserveRatio.isEmpty()
-                                      ? Option.empty()
-                                      : Option.apply(diskReserveRatio.get()))
+                      if (diskInfo.actualUsableSpace() > 0
                           && diskInfo.status().equals(DiskStatus.HEALTHY)
                           && diskInfo.storageType() != StorageInfo.Type.HDFS
                           && diskInfo.storageType() != StorageInfo.Type.S3) {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 4bfdc2996..fa386e70e 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -867,8 +867,6 @@ private[celeborn] class Master(
               requestSlots.partitionIdList,
               requestSlots.shouldReplicate,
               requestSlots.shouldRackAware,
-              diskReserveSize,
-              diskReserveRatio,
               slotsAssignLoadAwareDiskGroupNum,
               slotsAssignLoadAwareDiskGroupGradient,
               loadAwareFlushTimeWeight,
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 1a0bc909b..0fb4c9d42 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.*;
 
-import scala.Option;
 import scala.Tuple2;
 
 import org.junit.Test;
@@ -232,8 +231,6 @@ public class SlotsAllocatorSuiteJ {
             partitionIds,
             shouldReplicate,
             false,
-            10 * 1024 * 1024 * 1024L,
-            Option.empty(),
             conf.masterSlotAssignLoadAwareDiskGroupNum(),
             conf.masterSlotAssignLoadAwareDiskGroupGradient(),
             conf.masterSlotAssignLoadAwareFlushTimeWeight(),
@@ -313,8 +310,6 @@ public class SlotsAllocatorSuiteJ {
               partitionIds,
               shouldReplicate,
               false,
-              1000_000_000,
-              Option.empty(),
               3,
               0.1,
               0,
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
index 9fe49bfd4..5b461013d 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
@@ -38,7 +38,8 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
       CelebornConf.APPLICATION_HEARTBEAT_TIMEOUT.key -> "10s")
     val workerConf = Map(
       CelebornConf.WORKER_STORAGE_DIRS.key -> "/tmp:capacity=1000",
-      CelebornConf.WORKER_HEARTBEAT_TIMEOUT.key -> "10s")
+      CelebornConf.WORKER_HEARTBEAT_TIMEOUT.key -> "10s",
+      CelebornConf.WORKER_DISK_RESERVE_SIZE.key -> "0G")
     workers = setupMiniClusterWithRandomPorts(masterConf, workerConf)._2
   }
 
@@ -76,10 +77,10 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
     assert(combineResult.equals(celebornCombineResult))
     assert(sqlResult.equals(celebornSqlResult))
 
-    // shuffle key not expired, diskInfo.actualUsableSpace < 0, no space
+    // shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space
     workers.foreach { worker =>
       worker.storageManager.disksSnapshot().foreach { diskInfo =>
-        assert(diskInfo.actualUsableSpace < 0)
+        assert(diskInfo.actualUsableSpace <= 0)
       }
     }
     sparkSessionEnableCeleborn.stop()
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index b7a5eb75b..c7c8f8adf 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -43,7 +43,7 @@ import 
org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
 import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, StorageInfo}
 import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker._
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
 import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
@@ -73,6 +73,9 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
   val storagePolicy = new StoragePolicy(conf, this, workerSource)
 
+  val diskReserveSize = conf.workerDiskReserveSize
+  val diskReserveRatio = conf.workerDiskReserveRatio
+
   val topDiskUsageCount = conf.metricsAppTopDiskUsageCount
 
   // (deviceName -> deviceInfo) and (mount point -> diskInfo)
@@ -851,20 +854,32 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           0
         }
       }.sum
-      val fileStore = Files.getFileStore(Paths.get(diskInfo.mountPoint))
-      val fileSystemReportedUsableSpace = fileStore.getUsableSpace
+
+      val (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace) =
+        getFileSystemReportedSpace(diskInfo.mountPoint)
       val workingDirUsableSpace =
         Math.min(diskInfo.configuredUsableSpace - totalUsage, 
fileSystemReportedUsableSpace)
-      val totalSpace = fileStore.getTotalSpace
-      logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace 
filemeta:$fileSystemReportedUsableSpace conf:${diskInfo.configuredUsableSpace} 
totalUsage:$totalUsage totalSpace: $totalSpace")
-      diskInfo.setUsableSpace(workingDirUsableSpace)
-      diskInfo.setTotalSpace(totalSpace)
+      val minimumReserveSize =
+        DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, 
diskReserveRatio)
+      val usableSpace = Math.max(workingDirUsableSpace - minimumReserveSize, 0)
+      logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace 
filemeta:$fileSystemReportedUsableSpace" +
+        s"conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage 
totalSpace:$fileSystemReportedTotalSpace" +
+        s"minimumReserveSize:$minimumReserveSize usableSpace:$usableSpace")
+      diskInfo.setUsableSpace(usableSpace)
+      diskInfo.setTotalSpace(fileSystemReportedTotalSpace)
       diskInfo.updateFlushTime()
       diskInfo.updateFetchTime()
     }
     logInfo(s"Updated diskInfos:\n${disksSnapshot().mkString("\n")}")
   }
 
+  def getFileSystemReportedSpace(mountPoint: String): (Long, Long) = {
+    val fileStore = Files.getFileStore(Paths.get(mountPoint))
+    val fileSystemReportedUsableSpace = fileStore.getUsableSpace
+    val fileSystemReportedTotalSpace = fileStore.getTotalSpace
+    (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace)
+  }
+
   def userResourceConsumptionSnapshot(): Map[UserIdentifier, 
ResourceConsumption] = {
     diskFileInfos.synchronized {
       // shuffleId -> (fileName -> fileInfo)
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
index 5b747e9b1..b85697552 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
@@ -17,12 +17,26 @@
 
 package org.apache.celeborn.service.deploy.worker.storage
 
+import org.mockito.{Mockito, MockitoSugar}
+import org.mockito.ArgumentMatchersSugar.any
+import org.mockito.stubbing.Stubber
+
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
 import 
org.apache.celeborn.common.CelebornConf.{WORKER_GRACEFUL_SHUTDOWN_ENABLED, 
WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
+import org.apache.celeborn.common.meta.DiskInfo
+import org.apache.celeborn.common.util.DiskUtils
 import org.apache.celeborn.service.deploy.worker.WorkerSource
 
-class StorageManagerSuite extends CelebornFunSuite {
+trait MockitoHelper extends MockitoSugar {
+  def doReturn(toBeReturned: Any): Stubber = {
+    Mockito.doReturn(toBeReturned, Nil: _*)
+  }
+}
+
+class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
+
+  val conf = new CelebornConf()
 
   test("[CELEBORN-926] saveAllCommittedFileInfosToDB cause 
IllegalMonitorStateException") {
     val conf = new CelebornConf().set(WORKER_GRACEFUL_SHUTDOWN_ENABLED, true)
@@ -31,4 +45,44 @@ class StorageManagerSuite extends CelebornFunSuite {
     // should not throw IllegalMonitorStateException exception
     storageManager.saveAllCommittedFileInfosToDB()
   }
+
+  test("updateDiskInfosWithDiskReserveSize") {
+    val storageManager = new StorageManager(conf, new WorkerSource(conf))
+    val spyStorageManager = spy(storageManager)
+
+    val disks = prepareDisks()
+    val diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+    doReturn(disks).when(spyStorageManager).disksSnapshot()
+    
doReturn(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
+    spyStorageManager.updateDiskInfos()
+    for (disk <- disks) {
+      val minimumReserveSize =
+        DiskUtils.getMinimumUsableSize(
+          disk,
+          conf.workerDiskReserveSize,
+          conf.workerDiskReserveRatio)
+      assert(disk.actualUsableSpace == diskSetSpace._1 - minimumReserveSize)
+    }
+  }
+
+  def prepareDisks(): List[DiskInfo] = {
+    val diskSetSpaces = Array(
+      90L * 1024 * 1024 * 1024,
+      95L * 1024 * 1024 * 1024,
+      100L * 1024 * 1024 * 1024)
+
+    val diskInfo1 = new DiskInfo("/mnt/disk1", List.empty, null, conf)
+    diskInfo1.configuredUsableSpace = (Long.MaxValue)
+    diskInfo1.setUsableSpace(diskSetSpaces(0))
+
+    val diskInfo2 = new DiskInfo("/mnt/disk2", List.empty, null, conf)
+    diskInfo2.configuredUsableSpace = (Long.MaxValue)
+    diskInfo2.setUsableSpace(diskSetSpaces(1))
+
+    val diskInfo3 = new DiskInfo("/mnt/disk3", List.empty, null, conf)
+    diskInfo3.configuredUsableSpace = (Long.MaxValue)
+    diskInfo3.setUsableSpace(diskSetSpaces(2))
+
+    List(diskInfo1, diskInfo2, diskInfo3)
+  }
 }

Reply via email to