RexXiong commented on code in PR #3365:
URL: https://github.com/apache/celeborn/pull/3365#discussion_r2283923249
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java:
##########
@@ -284,27 +289,49 @@ public void updateWorkerHeartbeatMeta(
}
// If using HDFSONLY mode, workers with empty disks should not be put into
excluded worker list.
- long unhealthyDiskNum =
- disks.values().stream().filter(s ->
!s.status().equals(DiskStatus.HEALTHY)).count();
- boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >=
unhealthyDiskRatioThreshold;
+ Pair<Boolean, Long> exceedCheckResult =
isExceedingUnhealthyThreshold(disks);
if (!excludedWorkers.contains(worker)
- && (((disks.isEmpty() || exceed)
- && !conf.hasHDFSStorage()
- && !conf.hasS3Storage()
- && !conf.hasOssStorage())
+ && (((disks.isEmpty() || exceedCheckResult.getLeft()) &&
!hasRemoteStorage())
|| highWorkload)) {
LOG.warn(
- "Worker {} (unhealthy disks num: {}) adds to excluded workers",
worker, unhealthyDiskNum);
+ "Worker {} (unhealthy disks num: {}) adds to excluded workers",
+ worker,
+ exceedCheckResult.getRight());
excludedWorkers.add(worker);
- } else if ((availableSlots.get() > 0
- || conf.hasHDFSStorage()
- || conf.hasS3Storage()
- || conf.hasOssStorage())
- && !highWorkload) {
+ } else if ((availableSlots.get() > 0 || hasRemoteStorage()) &&
!highWorkload) {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}
+ // release high work load workers when too many excluded workers
+ if (releaseHighWorkLoadEnabled
+ && excludedWorkers.size()
+ >= Math.floor(workersMap.size() *
releaseHighWorkLoadRatioThreshold)) {
+ synchronized (workersMap) {
+ List<WorkerInfo> toRemoved =
+ excludedWorkers.stream()
+ .filter(
+ w -> {
+ Optional<WorkerInfo> infoOpt =
Review Comment:
Optional seems redundant
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java:
##########
@@ -284,27 +289,49 @@ public void updateWorkerHeartbeatMeta(
}
// If using HDFSONLY mode, workers with empty disks should not be put into
excluded worker list.
- long unhealthyDiskNum =
- disks.values().stream().filter(s ->
!s.status().equals(DiskStatus.HEALTHY)).count();
- boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >=
unhealthyDiskRatioThreshold;
+ Pair<Boolean, Long> exceedCheckResult =
isExceedingUnhealthyThreshold(disks);
if (!excludedWorkers.contains(worker)
- && (((disks.isEmpty() || exceed)
- && !conf.hasHDFSStorage()
- && !conf.hasS3Storage()
- && !conf.hasOssStorage())
+ && (((disks.isEmpty() || exceedCheckResult.getLeft()) &&
!hasRemoteStorage())
|| highWorkload)) {
LOG.warn(
- "Worker {} (unhealthy disks num: {}) adds to excluded workers",
worker, unhealthyDiskNum);
+ "Worker {} (unhealthy disks num: {}) adds to excluded workers",
+ worker,
+ exceedCheckResult.getRight());
excludedWorkers.add(worker);
- } else if ((availableSlots.get() > 0
- || conf.hasHDFSStorage()
- || conf.hasS3Storage()
- || conf.hasOssStorage())
- && !highWorkload) {
+ } else if ((availableSlots.get() > 0 || hasRemoteStorage()) &&
!highWorkload) {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}
+ // release high work load workers when too many excluded workers
+ if (releaseHighWorkLoadEnabled
+ && excludedWorkers.size()
+ >= Math.floor(workersMap.size() *
releaseHighWorkLoadRatioThreshold)) {
+ synchronized (workersMap) {
+ List<WorkerInfo> toRemoved =
+ excludedWorkers.stream()
+ .filter(
+ w -> {
+ Optional<WorkerInfo> infoOpt =
+ Optional.ofNullable(workersMap.get(w.toUniqueId()));
+ if (infoOpt.isPresent()) {
+ WorkerInfo info = infoOpt.get();
+ Map<String, DiskInfo> diskInfos = info.diskInfos();
+ if (info.isHighWorkLoad()
Review Comment:
We can return directly to avoid use `if`
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -6421,6 +6425,25 @@ object CelebornConf extends Logging {
.checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
.createWithDefault(1)
+ val MASTER_RELEASE_HIGH_WORKLOAD_WORKER_ENABLE: ConfigEntry[Boolean] =
Review Comment:
MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED
##########
master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java:
##########
@@ -756,6 +756,182 @@ public void testHandleWorkerHeartbeat() {
assertEquals(statusSystem.availableWorkers.size(), 0);
}
+ @Test
+ public void testReleaseHighWorkLoadWorkers() {
Review Comment:
Ratis MetaSystem should also test this case
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java:
##########
@@ -284,27 +289,49 @@ public void updateWorkerHeartbeatMeta(
}
// If using HDFSONLY mode, workers with empty disks should not be put into
excluded worker list.
- long unhealthyDiskNum =
- disks.values().stream().filter(s ->
!s.status().equals(DiskStatus.HEALTHY)).count();
- boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >=
unhealthyDiskRatioThreshold;
+ Pair<Boolean, Long> exceedCheckResult =
isExceedingUnhealthyThreshold(disks);
if (!excludedWorkers.contains(worker)
- && (((disks.isEmpty() || exceed)
- && !conf.hasHDFSStorage()
- && !conf.hasS3Storage()
- && !conf.hasOssStorage())
+ && (((disks.isEmpty() || exceedCheckResult.getLeft()) &&
!hasRemoteStorage())
Review Comment:
let's use a method `hasStorage()` for this `((disks.isEmpty() ||
exceedCheckResult.getLeft()) && !hasRemoteStorage())` eg:`!hasStorage()`
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java:
##########
@@ -643,4 +670,15 @@ public void updateWorkerResourceConsumptions(
workerInfo.ifPresent(info ->
info.updateThenGetUserResourceConsumption(resourceConsumptions));
}
}
+
+ private boolean hasRemoteStorage() {
Review Comment:
Use a variable for this to avoid repeated calculations.
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java:
##########
@@ -284,27 +289,49 @@ public void updateWorkerHeartbeatMeta(
}
// If using HDFSONLY mode, workers with empty disks should not be put into
excluded worker list.
- long unhealthyDiskNum =
- disks.values().stream().filter(s ->
!s.status().equals(DiskStatus.HEALTHY)).count();
- boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >=
unhealthyDiskRatioThreshold;
+ Pair<Boolean, Long> exceedCheckResult =
isExceedingUnhealthyThreshold(disks);
if (!excludedWorkers.contains(worker)
- && (((disks.isEmpty() || exceed)
- && !conf.hasHDFSStorage()
- && !conf.hasS3Storage()
- && !conf.hasOssStorage())
+ && (((disks.isEmpty() || exceedCheckResult.getLeft()) &&
!hasRemoteStorage())
|| highWorkload)) {
LOG.warn(
- "Worker {} (unhealthy disks num: {}) adds to excluded workers",
worker, unhealthyDiskNum);
+ "Worker {} (unhealthy disks num: {}) adds to excluded workers",
+ worker,
+ exceedCheckResult.getRight());
excludedWorkers.add(worker);
- } else if ((availableSlots.get() > 0
- || conf.hasHDFSStorage()
- || conf.hasS3Storage()
- || conf.hasOssStorage())
- && !highWorkload) {
+ } else if ((availableSlots.get() > 0 || hasRemoteStorage()) &&
!highWorkload) {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}
+ // release high work load workers when too many excluded workers
+ if (releaseHighWorkLoadEnabled
+ && excludedWorkers.size()
+ >= Math.floor(workersMap.size() *
releaseHighWorkLoadRatioThreshold)) {
+ synchronized (workersMap) {
+ List<WorkerInfo> toRemoved =
+ excludedWorkers.stream()
+ .filter(
+ w -> {
+ Optional<WorkerInfo> infoOpt =
+ Optional.ofNullable(workersMap.get(w.toUniqueId()));
+ if (infoOpt.isPresent()) {
+ WorkerInfo info = infoOpt.get();
+ Map<String, DiskInfo> diskInfos = info.diskInfos();
+ if (info.isHighWorkLoad()
+ && (hasRemoteStorage()
Review Comment:
should be `hasRemoteStorage()
|| (!diskInfos.isEmpty()
&& !isExceedingUnhealthyThreshold(diskInfos).getLeft())`, we can also use
`hasStorage()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]