This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b330b550b [CELEBORN-1557] Fix totalSpace of DiskInfo for Master in HA
mode
b330b550b is described below
commit b330b550ba28e1b1d7fbe50993afd8ce5aa1fac8
Author: SteNicholas <[email protected]>
AuthorDate: Mon Aug 19 16:18:17 2024 +0800
[CELEBORN-1557] Fix totalSpace of DiskInfo for Master in HA mode
### What changes were proposed in this pull request?
Fix `totalSpace` of `DiskInfo` for Master in HA mode.
### Why are the changes needed?
The `totalSpace` of `DiskInfo` does not sync for Master in HA mode, which
causes that the `totalSpace` is incorrect.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`RatisMasterStatusSystemSuiteJ#testHandleRegisterWorker`
Closes #2690 from SteNicholas/CELEBORN-1557.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../deploy/master/clustermeta/MetaUtil.java | 16 +++--
master/src/main/proto/Resource.proto | 1 +
.../ha/RatisMasterStatusSystemSuiteJ.java | 79 ++++++++++++++++++----
.../deploy/worker/storage/StorageManager.scala | 9 +--
4 files changed, 81 insertions(+), 24 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
index 6878e047c..1937dec25 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
@@ -63,13 +63,14 @@ public class MetaUtil {
(k, v) -> {
DiskInfo diskInfo =
new DiskInfo(
- v.getMountPoint(),
- v.getUsableSpace(),
- v.getAvgFlushTime(),
- v.getAvgFetchTime(),
- v.getUsedSlots(),
- StorageInfo.typesMap.get(v.getStorageType()));
- diskInfo.setStatus(Utils.toDiskStatus(v.getStatus()));
+ v.getMountPoint(),
+ v.getUsableSpace(),
+ v.getAvgFlushTime(),
+ v.getAvgFetchTime(),
+ v.getUsedSlots(),
+ StorageInfo.typesMap.get(v.getStorageType()))
+ .setStatus(Utils.toDiskStatus(v.getStatus()))
+ .setTotalSpace(v.getTotalSpace());
map.put(k, diskInfo);
});
return map;
@@ -90,6 +91,7 @@ public class MetaUtil {
.setUsedSlots(v.activeSlots())
.setStorageType(v.storageType().getValue())
.setStatus(v.status().getValue())
+ .setTotalSpace(v.totalSpace())
.build()));
return map;
}
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index 4a944748f..2c129f61e 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -83,6 +83,7 @@ message DiskInfo {
required int32 status = 5;
required int64 avgFetchTime = 6;
required int32 storageType =7;
+ optional int64 totalSpace = 8;
}
message RequestSlotsRequest {
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index dcf48ced6..72949f5d1 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -347,10 +347,27 @@ public class RatisMasterStatusSystemSuiteJ {
getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(STATUSSYSTEM1.workers.size(), 3);
Assert.assertEquals(3, STATUSSYSTEM1.workers.size());
Assert.assertEquals(3, STATUSSYSTEM2.workers.size());
Assert.assertEquals(3, STATUSSYSTEM3.workers.size());
+
+ assertWorkers(STATUSSYSTEM1.workers);
+ assertWorkers(STATUSSYSTEM2.workers);
+ assertWorkers(STATUSSYSTEM3.workers);
+ }
+
+ private void assertWorkers(Set<WorkerInfo> workerInfos) {
+ for (WorkerInfo workerInfo : workerInfos) {
+ assertWorker(workerInfo);
+ }
+ }
+
+ private void assertWorker(WorkerInfo workerInfo) {
+ Map<String, DiskInfo> diskInfos = workerInfo.diskInfos();
+ Assert.assertEquals(96 * 1024 * 1024 * 1024L,
diskInfos.get("disk1").totalSpace());
+ Assert.assertEquals(96 * 1024 * 1024 * 1024L,
diskInfos.get("disk2").totalSpace());
+ Assert.assertEquals(96 * 1024 * 1024 * 1024L,
diskInfos.get("disk3").totalSpace());
+ Assert.assertEquals(96 * 1024 * 1024 * 1024L,
diskInfos.get("disk4").totalSpace());
}
@Test
@@ -994,22 +1011,58 @@ public class RatisMasterStatusSystemSuiteJ {
STATUSSYSTEM3.workerLostEvents.clear();
disks1.clear();
- disks1.put("disk1", new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
- disks1.put("disk2", new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
- disks1.put("disk3", new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
- disks1.put("disk4", new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
+ disks1.put(
+ "disk1",
+ new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
+ disks1.put(
+ "disk2",
+ new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
+ disks1.put(
+ "disk3",
+ new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
+ disks1.put(
+ "disk4",
+ new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
disks2.clear();
- disks2.put("disk1", new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
- disks2.put("disk2", new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
- disks2.put("disk3", new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
- disks2.put("disk4", new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
+ disks2.put(
+ "disk1",
+ new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
+ disks2.put(
+ "disk2",
+ new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
+ disks2.put(
+ "disk3",
+ new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
+ disks2.put(
+ "disk4",
+ new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
disks3.clear();
- disks3.put("disk1", new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
- disks3.put("disk2", new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
- disks3.put("disk3", new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
- disks3.put("disk4", new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100,
100, 0));
+ disks3.put(
+ "disk1",
+ new DiskInfo("disk1", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
+ disks3.put(
+ "disk2",
+ new DiskInfo("disk2", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
+ disks3.put(
+ "disk3",
+ new DiskInfo("disk3", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
+ disks3.put(
+ "disk4",
+ new DiskInfo("disk4", 64 * 1024 * 1024 * 1024L, 100, 100, 0)
+ .setTotalSpace(96 * 1024 * 1024 * 1024L));
}
@Test
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index e278041f1..b7a5eb75b 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -851,13 +851,14 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
0
}
}.sum
- val fileSystemReportedUsableSpace = Files.getFileStore(
- Paths.get(diskInfo.mountPoint)).getUsableSpace
+ val fileStore = Files.getFileStore(Paths.get(diskInfo.mountPoint))
+ val fileSystemReportedUsableSpace = fileStore.getUsableSpace
val workingDirUsableSpace =
Math.min(diskInfo.configuredUsableSpace - totalUsage,
fileSystemReportedUsableSpace)
- logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace
filemeta:$fileSystemReportedUsableSpace conf:${diskInfo.configuredUsableSpace}
totalUsage:$totalUsage")
+ val totalSpace = fileStore.getTotalSpace
+ logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace
filemeta:$fileSystemReportedUsableSpace conf:${diskInfo.configuredUsableSpace}
totalUsage:$totalUsage totalSpace: $totalSpace")
diskInfo.setUsableSpace(workingDirUsableSpace)
- diskInfo.setTotalSpace(totalUsage + workingDirUsableSpace)
+ diskInfo.setTotalSpace(totalSpace)
diskInfo.updateFlushTime()
diskInfo.updateFetchTime()
}