This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 5e6a23fd8 [CELEBORN-868][MASTER] Adjust logic of
SlotsAllocator#offerSlotsLoadAware fallback to roundrobin
5e6a23fd8 is described below
commit 5e6a23fd885f9383bd2370353d7b616ddbb4518c
Author: zwangsheng <[email protected]>
AuthorDate: Tue Aug 1 20:39:23 2023 +0800
[CELEBORN-868][MASTER] Adjust logic of SlotsAllocator#offerSlotsLoadAware
fallback to roundrobin
### What changes were proposed in this pull request?
Fallback in following order:
1. usableDisks is empty (no need to call iter)
2. under replicate case, first usableDisks == 1 fast fallback
3. count distinct worker
### Why are the changes needed?
Clear about the logic here
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit Test
Closes #1781 from zwangsheng/CELEBORN-868.
Authored-by: zwangsheng <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/service/deploy/master/SlotsAllocator.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index cac3bdf13..05d833cb8 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -124,11 +124,13 @@ public class SlotsAllocator {
}
}));
- Set<WorkerInfo> usableWorkers = new HashSet<>();
- for (DiskInfo disk : usableDisks) {
- usableWorkers.add(diskToWorkerMap.get(disk));
- }
- if ((shouldReplicate && usableWorkers.size() <= 1) ||
usableDisks.isEmpty()) {
+ boolean shouldFallback =
+ usableDisks.isEmpty()
+ || (shouldReplicate
+ && (usableDisks.size() == 1
+ ||
usableDisks.stream().map(diskToWorkerMap::get).distinct().count() <= 1));
+
+ if (shouldFallback) {
logger.warn(
"offer slots for {} fallback to roundrobin because there is no
usable disks",
StringUtils.join(partitionIds, ','));