This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.1
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.1 by this push:
     new f7fdd173 [ISSUE-601] limit the max workers while offerSlots (#1096)
f7fdd173 is described below

commit f7fdd1733c430ea1f7e1db14af8e2debeec000f1
Author: jxysoft <[email protected]>
AuthorDate: Wed Dec 21 13:07:36 2022 +0800

    [ISSUE-601] limit the max workers while offerSlots (#1096)
    
    Co-authored-by: xianyao.jiang <[email protected]>
---
 .../scala/com/aliyun/emr/rss/common/RssConf.scala  |  18 +++
 .../emr/rss/service/deploy/master/MasterUtil.java  |  65 +++++++--
 .../emr/rss/service/deploy/master/Master.scala     |   3 +-
 .../service/deploy/master/MasterUtilSuiteJ.java    | 150 ++++++++++++++++++++-
 4 files changed, 216 insertions(+), 20 deletions(-)

diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala 
b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
index 1e3f6298..b41b48ff 100644
--- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
+++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
@@ -652,6 +652,24 @@ object RssConf extends Logging {
     conf.getInt("rss.offer.slots.extra.size", 2)
   }
 
+  def offerSlotsOrderByFreeSlots(conf: RssConf): Boolean = {
+    conf.getBoolean("rss.offer.slots.orderByFreeSlots", false)
+  }
+
+  def offerSlotsMaxWorkers(conf: RssConf): Int = {
+    // -1 unlimited
+    conf.getInt("rss.offer.slots.maxWorkers", -1)
+  }
+
+  def offerSlotsMinWorkers(conf: RssConf): Int = {
+    conf.getInt("rss.offer.slots.minWorkers", 1)
+  }
+
+  def offerSlotsMinPartitionsPerWorker(conf: RssConf): Long = {
+    // -1 unlimited
+    conf.getLong("rss.offer.slots.minPartitionsPerWorker", -1)
+  }
+
   def shuffleWriterMode(conf: RssConf): String = {
     conf.get("rss.shuffle.writer.mode", "hash")
   }
diff --git 
a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/MasterUtil.java
 
b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/MasterUtil.java
index 846921c2..39d8aee9 100644
--- 
a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/MasterUtil.java
+++ 
b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/MasterUtil.java
@@ -27,6 +27,7 @@ import java.util.Random;
 
 import scala.Tuple2;
 
+import com.aliyun.emr.rss.common.RssConf;
 import com.aliyun.emr.rss.common.meta.WorkerInfo;
 import com.aliyun.emr.rss.common.protocol.PartitionLocation;
 
@@ -50,22 +51,62 @@ public class MasterUtil {
       String shuffleKey,
       List<WorkerInfo> workers,
       List<Integer> reduceIds,
-      boolean shouldReplicate) {
-    int[] oldEpochs = new int[reduceIds.size()];
-    Arrays.fill(oldEpochs, -1);
-    return offerSlots(shuffleKey, workers, reduceIds, oldEpochs, 
shouldReplicate);
+      boolean shouldReplicate,
+      RssConf conf) {
+    if (workers.size() < 1 || workers.size() < 2 && shouldReplicate) {
+      return null;
+    }
+
+    int targetSlots = shouldReplicate ? reduceIds.size() * 2 : 
reduceIds.size();
+
+    // get max number of workers
+    int maxWorkerNums = workers.size();
+    long offerSlotsMinPartitionsPerWorker = 
RssConf.offerSlotsMinPartitionsPerWorker(conf);
+    if (offerSlotsMinPartitionsPerWorker > 0) {
+      int workerNums = (int) ((targetSlots +
+              offerSlotsMinPartitionsPerWorker - 1) / 
offerSlotsMinPartitionsPerWorker);
+      maxWorkerNums = Math.min(maxWorkerNums, workerNums);
+    }
+
+    int offerSlotsMaxWorkers = RssConf.offerSlotsMaxWorkers(conf);
+    if (offerSlotsMaxWorkers > 0) {
+      maxWorkerNums = Math.min(maxWorkerNums, offerSlotsMaxWorkers);
+    }
+
+    int minWorkerNums = Math.min(targetSlots, 
Math.max(RssConf.offerSlotsMinWorkers(conf), 1));
+    if (shouldReplicate) {
+      minWorkerNums = Math.max(2, minWorkerNums);
+    }
+    maxWorkerNums = Math.max(minWorkerNums, maxWorkerNums);
+
+    // choose max number of workers
+    if (maxWorkerNums < workers.size()) {
+      if (RssConf.offerSlotsOrderByFreeSlots(conf)) {
+        workers.sort((o1, o2) -> o2.freeSlots() - o1.freeSlots());
+      }
+      List<WorkerInfo> newWorkers = workers.subList(0, maxWorkerNums);
+      Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>> res =
+              doOfferSlots(newWorkers, reduceIds, shouldReplicate, 
targetSlots);
+      if( res != null) {
+        return res;
+      }
+    }
+
+    // fallback to original
+    return doOfferSlots(workers, reduceIds, shouldReplicate, targetSlots);
   }
 
-  public static Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>>
-    offerSlots(
-      String shuffleKey,
+  private static Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>>
+    doOfferSlots(
       List<WorkerInfo> workers,
       List<Integer> reduceIds,
-      int[] oldEpochs,
-      boolean shouldReplicate) {
-    if (workers.size() < 2 && shouldReplicate) {
-      return null;
-    }
+      boolean shouldReplicate,
+      long targetSlots) {
+    long totalFreeSlots = workers.stream().mapToLong(i -> i.freeSlots()).sum();
+    if (totalFreeSlots < targetSlots) return null;
+
+    int[] oldEpochs = new int[reduceIds.size()];
+    Arrays.fill(oldEpochs, -1);
 
     int masterInd = rand.nextInt(workers.size());
     Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
diff --git 
a/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
 
b/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
index 0d38d250..5012bab8 100644
--- 
a/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
+++ 
b/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
@@ -332,7 +332,8 @@ private[deploy] class Master(
         shuffleKey,
         workersNotBlacklisted(),
         requestSlots.reduceIdList,
-        requestSlots.shouldReplicate
+        requestSlots.shouldReplicate,
+        conf
       )
     }
 
diff --git 
a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
 
b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
index 33844425..61dae921 100644
--- 
a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
+++ 
b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import com.aliyun.emr.rss.common.RssConf;
 import scala.Tuple2;
 
 import org.junit.Test;
@@ -31,6 +32,7 @@ import com.aliyun.emr.rss.common.meta.WorkerInfo;
 import com.aliyun.emr.rss.common.protocol.PartitionLocation;
 
 public class MasterUtilSuiteJ {
+  RssConf rssConf = new RssConf();
 
   private List<WorkerInfo> prepareWorkers(int numSlots) {
     ArrayList<WorkerInfo> workers = new ArrayList<>(3);
@@ -47,7 +49,7 @@ public class MasterUtilSuiteJ {
     final List<Integer> reduceIds = Collections.emptyList();
     final boolean shouldReplicate = true;
 
-    check(0, 3, workers, reduceIds, shouldReplicate, true);
+    check(0, 3, workers, reduceIds, shouldReplicate, true, rssConf);
   }
 
   @Test
@@ -57,7 +59,7 @@ public class MasterUtilSuiteJ {
     final List<Integer> reduceIds = Collections.singletonList(0);
     final boolean shouldReplicate = true;
 
-    check(2, 1, workers, reduceIds, shouldReplicate, true);
+    check(2, 1, workers, reduceIds, shouldReplicate, true, rssConf);
   }
 
   @Test
@@ -67,7 +69,7 @@ public class MasterUtilSuiteJ {
     final List<Integer> reduceIds = Collections.singletonList(0);
     final boolean shouldReplicate = false;
 
-    check(1, 2, workers, reduceIds, shouldReplicate, true);
+    check(1, 2, workers, reduceIds, shouldReplicate, true, rssConf);
   }
 
   @Test
@@ -77,7 +79,7 @@ public class MasterUtilSuiteJ {
     final List<Integer> reduceIds = Arrays.asList(0, 1);
     final boolean shouldReplicate = false;
 
-    check(2, 1, workers, reduceIds, shouldReplicate, true);
+    check(2, 1, workers, reduceIds, shouldReplicate, true, rssConf);
   }
 
   @Test
@@ -87,7 +89,140 @@ public class MasterUtilSuiteJ {
     final List<Integer> reduceIds = Arrays.asList(0, 1, 2);
     final boolean shouldReplicate = false;
 
-    check(3, 0, workers, reduceIds, shouldReplicate, true);
+    check(3, 0, workers, reduceIds, shouldReplicate, true, rssConf);
+  }
+
+  public void testAllocateSlotsForOrderByFreeSlots() {
+    final List<WorkerInfo> workers = new ArrayList<>(3);
+    workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 1, null));
+    workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 2, null));
+    workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 3, null));
+    final List<Integer> reduceIds = Arrays.asList(0, 1, 2);
+    final boolean shouldReplicate = false;
+
+    RssConf conf = new RssConf();
+    conf.set("rss.offer.slots.orderByFreeSlots", "true");
+    check(3, 3, workers, reduceIds, shouldReplicate, true, conf);
+  }
+
+  @Test
+  public void testAllocateSlotsByReduceIdsWithoutReplicate() {
+    RssConf conf = new RssConf();
+    conf.set("rss.offer.slots.minWorkers", "3");
+    conf.set("rss.offer.slots.maxWorkers", "6");
+    conf.set("rss.offer.slots.minPartitionsPerWorker", "2");
+
+    List<Integer> reduceIds = null;
+    boolean shouldReplicate = false;
+    int totalSlots = 10 * 3;
+
+    List<WorkerInfo> workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(1);
+    check(1, totalSlots - 1, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(3);
+    check(3, totalSlots - 3, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(4);
+    check(3, totalSlots - 4, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(6);
+    check(3, totalSlots - 6, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(7);
+    check(4, totalSlots - 7, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(12);
+    check(6, totalSlots - 12, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(13);
+    check(6, totalSlots - 13, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(18);
+    check(6, totalSlots - 18, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(19);
+    check(10, totalSlots - 19, workers, reduceIds, shouldReplicate, true, 
conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(30);
+    check(10, totalSlots - 30, workers, reduceIds, shouldReplicate, true, 
conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(31);
+    check(0, totalSlots, workers, reduceIds, shouldReplicate, false, conf);
+  }
+
+  @Test
+  public void testAllocateSlotsByReduceIdsWithReplicate() {
+    RssConf conf = new RssConf();
+    conf.set("rss.offer.slots.minWorkers", "3");
+    conf.set("rss.offer.slots.maxWorkers", "6");
+    conf.set("rss.offer.slots.minPartitionsPerWorker", "2");
+
+    List<Integer> reduceIds = null;
+    boolean shouldReplicate = true;
+    int totalSlots = 10 * 3;
+
+    List<WorkerInfo> workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(1);
+    check(2, totalSlots - 2, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(3);
+    check(3, totalSlots - 6, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(4);
+    check(4, totalSlots - 8, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(6);
+    check(6, totalSlots - 12, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(7);
+    check(6, totalSlots - 14, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(9);
+    check(6, totalSlots - 18, workers, reduceIds, shouldReplicate, true, conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(10);
+    check(10, totalSlots - 20, workers, reduceIds, shouldReplicate, true, 
conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(15);
+    check(10, totalSlots - 30, workers, reduceIds, shouldReplicate, true, 
conf);
+
+    workers = genWorkers(10, 3);
+    reduceIds = genReduceIds(16);
+    check(0, totalSlots, workers, reduceIds, shouldReplicate, false, conf);
+  }
+
+  private List<WorkerInfo> genWorkers(int num, int slotPerWorker) {
+    List<WorkerInfo> workers = new ArrayList<>(num);
+    for (int i = 1; i <= num; i++) {
+      workers.add(new WorkerInfo("host" + i, 9, 10, 110, 113, slotPerWorker, 
null));
+    }
+    return workers;
+  }
+
+  private List<Integer> genReduceIds(int num) {
+    List<Integer> res = new ArrayList(num);
+    for (int i = 0; i < num; i++) {
+      res.add(i);
+    }
+    return res;
   }
 
   private void check(
@@ -96,10 +231,11 @@ public class MasterUtilSuiteJ {
       List<WorkerInfo> workers,
       List<Integer> reduceIds,
       boolean shouldReplicate,
-      boolean expectSuccess) {
+      boolean expectSuccess,
+      RssConf rssConf) {
     String shuffleKey = "appId-1";
     Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
-        MasterUtil.offerSlots(shuffleKey, workers, reduceIds, shouldReplicate);
+        MasterUtil.offerSlots(shuffleKey, workers, reduceIds, shouldReplicate, 
rssConf);
 
     if (expectSuccess) {
       assert usedWorkers == slots.size() : "Offer slots, expect to return "

Reply via email to