This is an automated email from the ASF dual-hosted git repository. zhouky pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push: new 5e6a23fd8 [CELEBORN-868][MASTER] Adjust logic of SlotsAllocator#offerSlotsLoadAware fallback to roundrobin 5e6a23fd8 is described below commit 5e6a23fd885f9383bd2370353d7b616ddbb4518c Author: zwangsheng <2213335...@qq.com> AuthorDate: Tue Aug 1 20:39:23 2023 +0800 [CELEBORN-868][MASTER] Adjust logic of SlotsAllocator#offerSlotsLoadAware fallback to roundrobin ### What changes were proposed in this pull request? Fallback in following order: 1. usableDisks is empty (no need to call iter) 2. under replicate case, first usableDisks == 1 fast fallback 3. count distinct worker ### Why are the changes needed? Clear about the logic here ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit Test Closes #1781 from zwangsheng/CELEBORN-868. Authored-by: zwangsheng <2213335...@qq.com> Signed-off-by: zky.zhoukeyong <zky.zhoukey...@alibaba-inc.com> --- .../celeborn/service/deploy/master/SlotsAllocator.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index cac3bdf13..05d833cb8 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -124,11 +124,13 @@ public class SlotsAllocator { } })); - Set<WorkerInfo> usableWorkers = new HashSet<>(); - for (DiskInfo disk : usableDisks) { - usableWorkers.add(diskToWorkerMap.get(disk)); - } - if ((shouldReplicate && usableWorkers.size() <= 1) || usableDisks.isEmpty()) { + boolean shouldFallback = + usableDisks.isEmpty() + || (shouldReplicate + && (usableDisks.size() == 1 + || usableDisks.stream().map(diskToWorkerMap::get).distinct().count() <= 1)); + + if (shouldFallback) { logger.warn( "offer slots for {} fallback to roundrobin because there is no usable disks", StringUtils.join(partitionIds, ','));