This is an automated email from the ASF dual-hosted git repository.
slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new a386ac1f5632 YARN-11684. Fix general contract violation in
PriorityQueueComparator. (#6725) Contributed by Tamas Domok.
a386ac1f5632 is described below
commit a386ac1f5632949bc49a9d09195073ba5f980b5c
Author: Tamas Domok
AuthorDate: Fri Apr 19 02:37:05 2024 +0200
YARN-11684. Fix general contract violation in PriorityQueueComparator.
(#6725) Contributed by Tamas Domok.
Signed-off-by: Shilun Fan
---
.../PriorityUtilizationQueueOrderingPolicy.java| 78 +--
...TestPriorityUtilizationQueueOrderingPolicy.java | 91 ++
2 files changed, 128 insertions(+), 41 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
index f60208e04849..5da54e1ec6c6 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
@@ -20,6 +20,7 @@ package
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
.RMNodeLabelsManager;
@@ -32,7 +33,6 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
@@ -54,17 +54,7 @@ import java.util.stream.Collectors;
public class PriorityUtilizationQueueOrderingPolicy
implements QueueOrderingPolicy {
private List queues;
- private boolean respectPriority;
-
- // This makes multiple threads can sort queues at the same time
- // For different partitions.
- private static ThreadLocal partitionToLookAt =
- ThreadLocal.withInitial(new Supplier() {
-@Override
-public String get() {
- return RMNodeLabelsManager.NO_LABEL;
-}
- });
+ private final boolean respectPriority;
/**
* Compare two queues with possibly different priority and assigned capacity,
@@ -101,15 +91,21 @@ public class PriorityUtilizationQueueOrderingPolicy
/**
* Comparator that both looks at priority and utilization
*/
- private class PriorityQueueComparator
+ final private class PriorityQueueComparator
implements Comparator {
+final private String partition;
+
+private PriorityQueueComparator(String partition) {
+ this.partition = partition;
+}
+
@Override
public int compare(PriorityQueueResourcesForSorting q1Sort,
PriorityQueueResourcesForSorting q2Sort) {
- String p = partitionToLookAt.get();
-
- int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p);
+ int rc = compareQueueAccessToPartition(
+ q1Sort.nodeLabelAccessible,
+ q2Sort.nodeLabelAccessible);
if (0 != rc) {
return rc;
}
@@ -133,8 +129,8 @@ public class PriorityUtilizationQueueOrderingPolicy
float used2 = q2Sort.absoluteUsedCapacity;
return compare(q1Sort, q2Sort, used1, used2,
-q1Sort.queue.getPriority().
-getPriority(), q2Sort.queue.getPriority().getPriority());
+q1Sort.priority.
+getPriority(), q2Sort.priority.getPriority());
} else{
// both q1 has positive abs capacity and q2 has positive abs
// capacity
@@ -142,8 +138,8 @@ public class PriorityUtilizationQueueOrderingPolicy
float used2 = q2Sort.usedCapacity;
return compare(q1Sort, q2Sort, used1, used2,
-q1Sort.queue.getPriority().getPriority(),
-q2Sort.queue.getPriority().getPriority());
+q1Sort.priority.getPriority(),
+q2Sort.priority.getPriority());
}
}
@@ -181,8 +177,7 @@ public class PriorityUtilizationQueueOrderingPolicy
return rc;
}
-private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2,
-Str