This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 0108a94c5b2f YARN-11684. Fix general contract violation in 
PriorityQueueComparator. (#6753) Contributed by Tamas Domok.
0108a94c5b2f is described below

commit 0108a94c5b2f7015b3de539d55bb8784e77d1df7
Author: Tamas Domok <tdo...@cloudera.com>
AuthorDate: Thu Apr 25 17:52:12 2024 +0200

    YARN-11684. Fix general contract violation in PriorityQueueComparator. 
(#6753) Contributed by Tamas Domok.
    
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../PriorityUtilizationQueueOrderingPolicy.java    | 78 +++++++++----------
 ...TestPriorityUtilizationQueueOrderingPolicy.java | 91 ++++++++++++++++++++++
 2 files changed, 128 insertions(+), 41 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
index f60208e04849..5da54e1ec6c6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
@@ -20,6 +20,7 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
 
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
     .RMNodeLabelsManager;
@@ -32,7 +33,6 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /**
@@ -54,17 +54,7 @@ import java.util.stream.Collectors;
 public class PriorityUtilizationQueueOrderingPolicy
     implements QueueOrderingPolicy {
   private List<CSQueue> queues;
-  private boolean respectPriority;
-
-  // This makes multiple threads can sort queues at the same time
-  // For different partitions.
-  private static ThreadLocal<String> partitionToLookAt =
-      ThreadLocal.withInitial(new Supplier<String>() {
-        @Override
-        public String get() {
-          return RMNodeLabelsManager.NO_LABEL;
-        }
-      });
+  private final boolean respectPriority;
 
   /**
    * Compare two queues with possibly different priority and assigned capacity,
@@ -101,15 +91,21 @@ public class PriorityUtilizationQueueOrderingPolicy
   /**
    * Comparator that both looks at priority and utilization
    */
-  private class PriorityQueueComparator
+  final private class PriorityQueueComparator
       implements Comparator<PriorityQueueResourcesForSorting> {
 
+    final private String partition;
+
+    private PriorityQueueComparator(String partition) {
+      this.partition = partition;
+    }
+
     @Override
     public int compare(PriorityQueueResourcesForSorting q1Sort,
         PriorityQueueResourcesForSorting q2Sort) {
-      String p = partitionToLookAt.get();
-
-      int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p);
+      int rc = compareQueueAccessToPartition(
+          q1Sort.nodeLabelAccessible,
+          q2Sort.nodeLabelAccessible);
       if (0 != rc) {
         return rc;
       }
@@ -133,8 +129,8 @@ public class PriorityUtilizationQueueOrderingPolicy
         float used2 = q2Sort.absoluteUsedCapacity;
 
         return compare(q1Sort, q2Sort, used1, used2,
-            q1Sort.queue.getPriority().
-                getPriority(), q2Sort.queue.getPriority().getPriority());
+            q1Sort.priority.
+                getPriority(), q2Sort.priority.getPriority());
       } else{
         // both q1 has positive abs capacity and q2 has positive abs
         // capacity
@@ -142,8 +138,8 @@ public class PriorityUtilizationQueueOrderingPolicy
         float used2 = q2Sort.usedCapacity;
 
         return compare(q1Sort, q2Sort, used1, used2,
-            q1Sort.queue.getPriority().getPriority(),
-            q2Sort.queue.getPriority().getPriority());
+            q1Sort.priority.getPriority(),
+            q2Sort.priority.getPriority());
       }
     }
 
@@ -181,8 +177,7 @@ public class PriorityUtilizationQueueOrderingPolicy
       return rc;
     }
 
-    private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2,
-        String partition) {
+    private int compareQueueAccessToPartition(boolean q1Accessible, boolean 
q2Accessible) {
       // Everybody has access to default partition
       if (StringUtils.equals(partition, RMNodeLabelsManager.NO_LABEL)) {
         return 0;
@@ -192,14 +187,6 @@ public class PriorityUtilizationQueueOrderingPolicy
        * Check accessible to given partition, if one queue accessible and
        * the other not, accessible queue goes first.
        */
-      boolean q1Accessible =
-          q1.getAccessibleNodeLabels() != null && q1.getAccessibleNodeLabels()
-              .contains(partition) || q1.getAccessibleNodeLabels().contains(
-              RMNodeLabelsManager.ANY);
-      boolean q2Accessible =
-          q2.getAccessibleNodeLabels() != null && q2.getAccessibleNodeLabels()
-              .contains(partition) || q2.getAccessibleNodeLabels().contains(
-              RMNodeLabelsManager.ANY);
       if (q1Accessible && !q2Accessible) {
         return -1;
       } else if (!q1Accessible && q2Accessible) {
@@ -218,22 +205,32 @@ public class PriorityUtilizationQueueOrderingPolicy
     private final float usedCapacity;
     private final Resource configuredMinResource;
     private final float absoluteCapacity;
+    private final Priority priority;
+    private final boolean nodeLabelAccessible;
     private final CSQueue queue;
 
-    PriorityQueueResourcesForSorting(CSQueue queue) {
+    PriorityQueueResourcesForSorting(CSQueue queue, String partition) {
       this.queue = queue;
       this.absoluteUsedCapacity =
           queue.getQueueCapacities().
-              getAbsoluteUsedCapacity(partitionToLookAt.get());
+              getAbsoluteUsedCapacity(partition);
       this.usedCapacity =
           queue.getQueueCapacities().
-              getUsedCapacity(partitionToLookAt.get());
+              getUsedCapacity(partition);
       this.absoluteCapacity =
           queue.getQueueCapacities().
-              getAbsoluteCapacity(partitionToLookAt.get());
+              getAbsoluteCapacity(partition);
       this.configuredMinResource =
           queue.getQueueResourceQuotas().
-              getConfiguredMinResource(partitionToLookAt.get());
+              getConfiguredMinResource(partition);
+      this.priority = queue.getPriority();
+      this.nodeLabelAccessible = queue.getAccessibleNodeLabels() != null &&
+          queue.getAccessibleNodeLabels().contains(partition) ||
+          queue.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY);
+    }
+
+    static PriorityQueueResourcesForSorting create(CSQueue queue, String 
partition) {
+      return new PriorityQueueResourcesForSorting(queue, partition);
     }
 
     public CSQueue getQueue() {
@@ -252,14 +249,13 @@ public class PriorityUtilizationQueueOrderingPolicy
 
   @Override
   public Iterator<CSQueue> getAssignmentIterator(String partition) {
-    // partitionToLookAt is a thread local variable, therefore it is safe to 
mutate it.
-    PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);
-
     // Copy (for thread safety) and sort the snapshot of the queues in order 
to avoid breaking
     // the prerequisites of TimSort. See YARN-10178 for details.
-    return new 
ArrayList<>(queues).stream().map(PriorityQueueResourcesForSorting::new).sorted(
-        new 
PriorityQueueComparator()).map(PriorityQueueResourcesForSorting::getQueue).collect(
-            Collectors.toList()).iterator();
+    return new ArrayList<>(queues).stream()
+        .map(queue -> PriorityQueueResourcesForSorting.create(queue, 
partition))
+        .sorted(new PriorityQueueComparator(partition))
+        .map(PriorityQueueResourcesForSorting::getQueue)
+        .collect(Collectors.toList()).iterator();
   }
 
   @Override
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java
index 4eea51e81972..dad888de0f11 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
@@ -28,9 +29,13 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -250,4 +255,90 @@ public class TestPriorityUtilizationQueueOrderingPolicy {
     verifyOrder(policy, "x", new String[] { "e", "c", "d", "b", "a" });
 
   }
+
+  @Test
+  public void testComparatorDoesNotValidateGeneralContract() {
+    final String[] nodeLabels = {"x", "y", "z"};
+    PriorityUtilizationQueueOrderingPolicy policy =
+        new PriorityUtilizationQueueOrderingPolicy(true);
+
+    final String partition = nodeLabels[randInt(0, nodeLabels.length - 1)];
+    List<CSQueue> list = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      CSQueue q = mock(CSQueue.class);
+      when(q.getQueuePath()).thenReturn(String.format("%d", i));
+
+      // simulating change in queueCapacities
+      when(q.getQueueCapacities())
+          .thenReturn(randomQueueCapacities(partition))
+          .thenReturn(randomQueueCapacities(partition))
+          .thenReturn(randomQueueCapacities(partition))
+          .thenReturn(randomQueueCapacities(partition))
+          .thenReturn(randomQueueCapacities(partition));
+
+      // simulating change in the priority
+      when(q.getPriority())
+          .thenReturn(Priority.newInstance(randInt(0, 10)))
+          .thenReturn(Priority.newInstance(randInt(0, 10)))
+          .thenReturn(Priority.newInstance(randInt(0, 10)))
+          .thenReturn(Priority.newInstance(randInt(0, 10)))
+          .thenReturn(Priority.newInstance(randInt(0, 10)));
+
+      if (randInt(0, nodeLabels.length) == 1) {
+        // simulating change in nodeLabels
+        when(q.getAccessibleNodeLabels())
+            .thenReturn(randomNodeLabels(nodeLabels))
+            .thenReturn(randomNodeLabels(nodeLabels))
+            .thenReturn(randomNodeLabels(nodeLabels))
+            .thenReturn(randomNodeLabels(nodeLabels))
+            .thenReturn(randomNodeLabels(nodeLabels));
+      }
+
+      // simulating change in configuredMinResource
+      when(q.getQueueResourceQuotas())
+          .thenReturn(randomResourceQuotas(partition))
+          .thenReturn(randomResourceQuotas(partition))
+          .thenReturn(randomResourceQuotas(partition))
+          .thenReturn(randomResourceQuotas(partition))
+          .thenReturn(randomResourceQuotas(partition));
+      list.add(q);
+    }
+
+    policy.setQueues(list);
+    // java.lang.IllegalArgumentException: Comparison method violates its 
general contract!
+    assertDoesNotThrow(() -> policy.getAssignmentIterator(partition));
+  }
+
+  private QueueCapacities randomQueueCapacities(String partition) {
+    QueueCapacities qc = new QueueCapacities(false);
+    qc.setAbsoluteCapacity(partition, (float) randFloat(0.0d, 100.0d));
+    qc.setUsedCapacity(partition, (float) randFloat(0.0d, 100.0d));
+    qc.setAbsoluteUsedCapacity(partition, (float) randFloat(0.0d, 100.0d));
+    return qc;
+  }
+
+  private Set<String> randomNodeLabels(String[] availableNodeLabels) {
+    Set<String> nodeLabels = new HashSet<>();
+    for (String label : availableNodeLabels) {
+      if (randInt(0, 1) == 1) {
+        nodeLabels.add(label);
+      }
+    }
+    return nodeLabels;
+  }
+
+  private QueueResourceQuotas randomResourceQuotas(String partition) {
+    QueueResourceQuotas qr = new QueueResourceQuotas();
+    qr.setConfiguredMinResource(partition,
+        Resource.newInstance(randInt(1, 10) * 1024, randInt(1, 10)));
+    return qr;
+  }
+
+  private static double randFloat(double min, double max) {
+    return min + ThreadLocalRandom.current().nextFloat() * (max - min);
+  }
+
+  private static int randInt(int min, int max) {
+    return ThreadLocalRandom.current().nextInt(min, max + 1);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to