mridulm commented on code in PR #3347:
URL: https://github.com/apache/celeborn/pull/3347#discussion_r2167233186


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2932,6 +2936,25 @@ object CelebornConf extends Logging {
         SlotsAssignPolicy.LOADAWARE.name))
       .createWithDefault(SlotsAssignPolicy.ROUNDROBIN.name)
 
+  val MASTER_SLOT_ASSIGN_INTERRUPTION_AWARE: ConfigEntry[Boolean] =
+    buildConf("celeborn.master.slot.assign.interruptionAware")
+      .categories("master")
+      .version("0.6.0")
+      .doc("If this is set to true, Celeborn master will prioritize partition 
placement on workers that are not " +
+        "in scope for maintenance soon.")
+      .version("0.6.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val MASTER_SLOT_INTERRUPTION_AWARE_THRESHOLD: ConfigEntry[Int] =
+    buildConf("celeborn.master.slot.assign.interruptionAware.threshold")
+      .categories("master")
+      .doc("This controls what percentage of hosts would be selected for slot 
selection in the first iteration " +
+        "of creating partitions. Default is 50%.")
+      .version("0.6.0")
+      .intConf

Review Comment:
   Change version for all of these to 0.7 ?
   Too late for 0.6, and perhaps not a good idea to add it to patch release



##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -344,19 +484,26 @@ static List<WorkerInfo> 
generateRackAwareWorkers(List<WorkerInfo> workers) {
   private static List<Integer> roundRobin(
       Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>> slots,
       List<Integer> partitionIds,
-      List<WorkerInfo> workers,
+      List<WorkerInfo> primaryWorkers,
+      List<WorkerInfo> replicaWorkers,
       Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions,
       boolean shouldReplicate,
       boolean shouldRackAware,
-      int availableStorageTypes) {
+      int availableStorageTypes,
+      boolean replicaSameAsPrimary) {
+    if (primaryWorkers.isEmpty() || replicaWorkers.isEmpty()) {
+      return partitionIds;
+    }
     // workerInfo -> (diskIndexForPrimaryAndReplica)
     Map<WorkerInfo, Integer> workerDiskIndex = new HashMap<>();
     List<Integer> partitionIdList = new LinkedList<>(partitionIds);
 
-    final int workerSize = workers.size();
-    final IntUnaryOperator incrementIndex = v -> (v + 1) % workerSize;
-    int primaryIndex = rand.nextInt(workerSize);
-    int replicaIndex = rand.nextInt(workerSize);
+    final int primaryWorkersSize = primaryWorkers.size();
+    final int replicaWorkersSize = replicaWorkers.size();
+    final IntUnaryOperator primaryWorkersIncrementIndex = v -> (v + 1) % 
primaryWorkersSize;
+    final IntUnaryOperator replicaWorkersIncrementIndex = v -> (v + 1) % 
replicaWorkersSize;
+    int primaryIndex = rand.nextInt(primaryWorkersSize);
+    int replicaIndex = rand.nextInt(replicaWorkersSize);

Review Comment:
   Compute `replicaIndex` here only when `shouldReplicate` is true - else 
`replicaWorkers` can be empty : resulting in div-by-zero
   
   We need to add a test to catch this case.



##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -246,11 +267,60 @@ private static StorageInfo getStorageInfo(
     return storageInfo;
   }
 
+  /**
+   * If interruptionAware = true, select workers based on 2 main criteria: <br>
+   * 1. Workers that have no nextInterruptionNotice are the first priority and 
are included in the
+   * 1st pass for slot selection. <br>
+   * 2. Workers that have a later interruption notice are a little less 
deprioritized, and are
+   * included in the 2nd pass for slot selection. This is determined by 
nextInterruptionNotice above
+   * a certain percentage threshold.<br>
+   * All other workers are considered least priority, and are only included 
for slot selection in
+   * the worst case. <br>
+   */
+  static Tuple3<List<WorkerInfo>, List<WorkerInfo>, List<WorkerInfo>>
+      prioritizeWorkersBasedOnInterruptionNotice(
+          List<WorkerInfo> workers,
+          boolean shouldReplicate,
+          boolean shouldRackAware,
+          double percentileThreshold) {
+    Map<Boolean, List<WorkerInfo>> partitioned =
+        
workers.stream().collect(Collectors.partitioningBy(WorkerInfo::hasInterruptionNotice));
+    List<WorkerInfo> workersWithInterruptions = partitioned.get(true);
+    List<WorkerInfo> workersWithoutInterruptions = partitioned.get(false);
+    // Timestamps towards the boundary of `percentileThreshold` might be the 
same. Given this
+    // is a stable sort, it makes sense to randomize these hosts so that the 
same hosts are not
+    // consistently selected.
+    Collections.shuffle(workersWithInterruptions);
+    workersWithInterruptions.sort(
+        
Comparator.comparingLong(WorkerInfo::nextInterruptionNotice).reversed());
+    int requiredNodes =
+        (int) Math.floor((percentileThreshold * 
workersWithInterruptions.size()) / 100.0);
+
+    List<WorkerInfo> workersWithLateInterruptions =
+        new ArrayList<>(workersWithInterruptions.subList(0, requiredNodes));
+    List<WorkerInfo> workersWithEarlyInterruptions =
+        new ArrayList<>(
+            workersWithInterruptions.subList(requiredNodes, 
workersWithInterruptions.size()));
+    if (shouldReplicate && shouldRackAware) {
+      return Tuple3.apply(
+          generateRackAwareWorkers(workersWithoutInterruptions),
+          generateRackAwareWorkers(workersWithLateInterruptions),

Review Comment:
   We dont need to call `generateRackAwareWorkers` here for 
`workersWithLateInterruptions`, right ?
   Since we always do it below in `locateSlots` where relevant ?



##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -344,19 +484,26 @@ static List<WorkerInfo> 
generateRackAwareWorkers(List<WorkerInfo> workers) {
   private static List<Integer> roundRobin(
       Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>> slots,
       List<Integer> partitionIds,
-      List<WorkerInfo> workers,
+      List<WorkerInfo> primaryWorkers,
+      List<WorkerInfo> replicaWorkers,
       Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions,
       boolean shouldReplicate,
       boolean shouldRackAware,
-      int availableStorageTypes) {
+      int availableStorageTypes,
+      boolean replicaSameAsPrimary) {
+    if (primaryWorkers.isEmpty() || replicaWorkers.isEmpty()) {

Review Comment:
   Check `replicaWorkers.isEmpty` only when `shouldReplicate == true` ?



##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -344,19 +484,26 @@ static List<WorkerInfo> 
generateRackAwareWorkers(List<WorkerInfo> workers) {
   private static List<Integer> roundRobin(
       Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>> slots,
       List<Integer> partitionIds,
-      List<WorkerInfo> workers,
+      List<WorkerInfo> primaryWorkers,
+      List<WorkerInfo> replicaWorkers,
       Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions,
       boolean shouldReplicate,
       boolean shouldRackAware,
-      int availableStorageTypes) {
+      int availableStorageTypes,
+      boolean replicaSameAsPrimary) {

Review Comment:
   Given the number of parameters have increased, and some of them are not 
obvious from first read - can you update the javadoc as well ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to