This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e6a23fd8 [CELEBORN-868][MASTER] Adjust logic of 
SlotsAllocator#offerSlotsLoadAware fallback to roundrobin
5e6a23fd8 is described below

commit 5e6a23fd885f9383bd2370353d7b616ddbb4518c
Author: zwangsheng <2213335...@qq.com>
AuthorDate: Tue Aug 1 20:39:23 2023 +0800

    [CELEBORN-868][MASTER] Adjust logic of SlotsAllocator#offerSlotsLoadAware 
fallback to roundrobin
    
    ### What changes were proposed in this pull request?
    Fallback in following order:
    
    1. usableDisks is empty (no need to call iter)
    2. under replicate case, first usableDisks == 1 fast fallback
    3. count distinct worker
    
    ### Why are the changes needed?
    Clear about the logic here
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit Test
    
    Closes #1781 from zwangsheng/CELEBORN-868.
    
    Authored-by: zwangsheng <2213335...@qq.com>
    Signed-off-by: zky.zhoukeyong <zky.zhoukey...@alibaba-inc.com>
---
 .../celeborn/service/deploy/master/SlotsAllocator.java       | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index cac3bdf13..05d833cb8 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -124,11 +124,13 @@ public class SlotsAllocator {
                       }
                     }));
 
-    Set<WorkerInfo> usableWorkers = new HashSet<>();
-    for (DiskInfo disk : usableDisks) {
-      usableWorkers.add(diskToWorkerMap.get(disk));
-    }
-    if ((shouldReplicate && usableWorkers.size() <= 1) || 
usableDisks.isEmpty()) {
+    boolean shouldFallback =
+        usableDisks.isEmpty()
+            || (shouldReplicate
+                && (usableDisks.size() == 1
+                    || 
usableDisks.stream().map(diskToWorkerMap::get).distinct().count() <= 1));
+
+    if (shouldFallback) {
       logger.warn(
           "offer slots for {} fallback to roundrobin because there is no 
usable disks",
           StringUtils.join(partitionIds, ','));

Reply via email to