http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java new file mode 100644 index 0000000..fe60611 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.function.Supplier; + +/** + * For two queues with the same priority: + * - The queue with less relative used-capacity goes first - todayâs behavior. + * - The default priority for all queues is 0 and equal. So, we get todayâs + * behaviour at every level - the queue with the lowest used-capacity + * percentage gets the resources + * + * For two queues with different priorities: + * - Both the queues are under their guaranteed capacities: The queue with + * the higher priority gets resources + * - Both the queues are over or meeting their guaranteed capacities: + * The queue with the higher priority gets resources + * - One of the queues is over or meeting their guaranteed capacities and the + * other is under: The queue that is under its capacity guarantee gets the + * resources. + */ +public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPolicy { + private List<CSQueue> queues; + private boolean respectPriority; + + // This makes multiple threads can sort queues at the same time + // For different partitions. + private static ThreadLocal<String> partitionToLookAt = + ThreadLocal.withInitial(new Supplier<String>() { + @Override + public String get() { + return RMNodeLabelsManager.NO_LABEL; + } + }); + + /** + * Compare two queues with possibly different priority and assigned capacity, + * Will be used by preemption policy as well. + * + * @param relativeAssigned1 relativeAssigned1 + * @param relativeAssigned2 relativeAssigned2 + * @param priority1 p1 + * @param priority2 p2 + * @return compared result + */ + public static int compare(double relativeAssigned1, double relativeAssigned2, + int priority1, int priority2) { + if (priority1 == priority2) { + // The queue with less relative used-capacity goes first + return Double.compare(relativeAssigned1, relativeAssigned2); + } else { + // When priority is different: + if ((relativeAssigned1 < 1.0f && relativeAssigned2 < 1.0f) || ( + relativeAssigned1 >= 1.0f && relativeAssigned2 >= 1.0f)) { + // When both the queues are under their guaranteed capacities, + // Or both the queues are over or meeting their guaranteed capacities + // queue with higher used-capacity goes first + return Integer.compare(priority2, priority1); + } else { + // Otherwise, when one of the queues is over or meeting their + // guaranteed capacities and the other is under: The queue that is + // under its capacity guarantee gets the resources. + return Double.compare(relativeAssigned1, relativeAssigned2); + } + } + } + + /** + * Comparator that both looks at priority and utilization + */ + private class PriorityQueueComparator implements Comparator<CSQueue> { + + @Override + public int compare(CSQueue q1, CSQueue q2) { + String p = partitionToLookAt.get(); + + int rc = compareQueueAccessToPartition(q1, q2, p); + if (0 != rc) { + return rc; + } + + float used1 = q1.getQueueCapacities().getUsedCapacity(p); + float used2 = q2.getQueueCapacities().getUsedCapacity(p); + int p1 = 0; + int p2 = 0; + if (respectPriority) { + p1 = q1.getPriority().getPriority(); + p2 = q2.getPriority().getPriority(); + } + + rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2); + + // For queue with same used ratio / priority, queue with higher configured + // capacity goes first + if (0 == rc) { + float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p); + float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p); + return Float.compare(abs2, abs1); + } + + return rc; + } + + private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, String partition) { + // Everybody has access to default partition + if (StringUtils.equals(partition, RMNodeLabelsManager.NO_LABEL)) { + return 0; + } + + /* + * Check accessible to given partition, if one queue accessible and + * the other not, accessible queue goes first. + */ + boolean q1Accessible = + q1.getAccessibleNodeLabels() != null && q1.getAccessibleNodeLabels() + .contains(partition) || q1.getAccessibleNodeLabels().contains( + RMNodeLabelsManager.ANY); + boolean q2Accessible = + q2.getAccessibleNodeLabels() != null && q2.getAccessibleNodeLabels() + .contains(partition) || q2.getAccessibleNodeLabels().contains( + RMNodeLabelsManager.ANY); + if (q1Accessible && !q2Accessible) { + return -1; + } else if (!q1Accessible && q2Accessible) { + return 1; + } + + return 0; + } + } + + public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) { + this.respectPriority = respectPriority; + } + + @Override + public void setQueues(List<CSQueue> queues) { + this.queues = queues; + } + + @Override + public Iterator<CSQueue> getAssignmentIterator(String partition) { + // Since partitionToLookAt is a thread local variable, and every time we + // copy and sort queues, so it's safe for multi-threading environment. + PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition); + List<CSQueue> sortedQueue = new ArrayList<>(queues); + Collections.sort(sortedQueue, new PriorityQueueComparator()); + return sortedQueue.iterator(); + } + + @Override + public String getConfigName() { + if (respectPriority) { + return CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY; + } else{ + return CapacitySchedulerConfiguration.QUEUE_UTILIZATION_ORDERING_POLICY; + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java new file mode 100644 index 0000000..a434ab0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/QueueOrderingPolicy.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +import java.util.Iterator; +import java.util.List; + +/** + * This will be used by + * {@link org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue} + * to decide allocation ordering of child queues. + */ +public interface QueueOrderingPolicy { + void setQueues(List<CSQueue> queues); + + /** + * Return an iterator over the collection of CSQueues which orders + * them for container assignment. + * + * Please note that, to avoid queue's set updated during sorting / iterating. + * Caller need to make sure parent queue's read lock is properly acquired. + * + * @param partition nodePartition + * + * @return iterator of queues to allocate + */ + Iterator<CSQueue> getAssignmentIterator(String partition); + + /** + * Returns configuration name (which will be used to set ordering policy + * @return configuration name + */ + String getConfigName(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 4329335..5e3b9be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -98,13 +99,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { private final Set<ContainerId> containersToPreempt = new HashSet<ContainerId>(); - + private CapacityHeadroomProvider headroomProvider; private ResourceCalculator rc = new DefaultResourceCalculator(); private ResourceScheduler scheduler; - + private AbstractContainerAllocator containerAllocator; /** @@ -115,7 +116,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { private Map<ContainerId, SchedContainerChangeRequest> toBeRemovedIncRequests = new ConcurrentHashMap<>(); - public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { this(applicationAttemptId, user, queue, activeUsersManager, rmContext, @@ -831,7 +832,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } return null; } - + public void setHeadroomProvider( CapacityHeadroomProvider headroomProvider) { try { @@ -841,7 +842,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { writeLock.unlock(); } } - + @Override public Resource getHeadroom() { try { @@ -855,7 +856,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } } - + @Override public void transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt) { @@ -867,7 +868,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { writeLock.unlock(); } } - + public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, RMContainer rmContainer, Resource reservedResource) { @@ -1148,4 +1149,85 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { public boolean equals(Object o) { return super.equals(o); } + + /** + * Move reservation from one node to another + * Comparing to unreserve container on source node and reserve a new + * container on target node. This method will not create new RMContainer + * instance. And this operation is atomic. + * + * @param reservedContainer to be moved reserved container + * @param sourceNode source node + * @param targetNode target node + * + * @return succeeded or not + */ + public boolean moveReservation(RMContainer reservedContainer, + FiCaSchedulerNode sourceNode, FiCaSchedulerNode targetNode) { + try { + writeLock.lock(); + if (!sourceNode.getPartition().equals(targetNode.getPartition())) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Failed to move reservation, two nodes are in different partition"); + } + return false; + } + + // Update reserved container to node map + Map<NodeId, RMContainer> map = reservedContainers.get( + reservedContainer.getReservedSchedulerKey()); + if (null == map) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot find reserved container map."); + } + return false; + } + + // Check if reserved container changed + if (sourceNode.getReservedContainer() != reservedContainer) { + if (LOG.isDebugEnabled()) { + LOG.debug("To-be-moved container already updated."); + } + return false; + } + + // Check if target node is empty, acquires lock of target node to make sure + // reservation happens transactional + synchronized (targetNode){ + if (targetNode.getReservedContainer() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Target node is already occupied before moving"); + } + } + + try { + targetNode.reserveResource(this, + reservedContainer.getReservedSchedulerKey(), reservedContainer); + } catch (IllegalStateException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Reserve on target node failed, e=", e); + } + return false; + } + + // Set source node's reserved container to null + sourceNode.setReservedContainer(null); + map.remove(sourceNode.getNodeID()); + + // Update reserved container + reservedContainer.handle( + new RMContainerReservedEvent(reservedContainer.getContainerId(), + reservedContainer.getReservedResource(), targetNode.getNodeID(), + reservedContainer.getReservedSchedulerKey())); + + // Add to target node + map.put(targetNode.getNodeID(), reservedContainer); + + return true; + } + } finally { + writeLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index a09a33c..b2d7d16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -61,6 +62,7 @@ import org.mockito.stubbing.Answer; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -476,6 +478,14 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { * -B... * </pre> * ";" splits queues, and there should no empty lines, no extra spaces + * + * For each queue, it has configurations to specify capacities (to each + * partition), format is: + * <pre> + * -<queueName> (<labelName1>=[guaranteed max used pending], \ + * <labelName2>=[guaranteed max used pending]) + * {key1=value1,key2=value2}; // Additional configs + * </pre> */ @SuppressWarnings({ "unchecked", "rawtypes" }) private ParentQueue mockQueueHierarchy(String queueExprs) { @@ -491,6 +501,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { queue = parentQueue; List<CSQueue> children = new ArrayList<CSQueue>(); when(parentQueue.getChildQueues()).thenReturn(children); + QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class); + when(policy.getConfigName()).thenReturn( + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy); } else { LeafQueue leafQueue = mock(LeafQueue.class); final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>( @@ -625,11 +639,57 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { when(queue.getPreemptionDisabled()).thenReturn( conf.getPreemptionDisabled(queuePath, false)); + // Setup other queue configurations + Map<String, String> otherConfigs = getOtherConfigurations( + queueExprArray[idx]); + if (otherConfigs.containsKey("priority")) { + when(queue.getPriority()).thenReturn( + Priority.newInstance(Integer.valueOf(otherConfigs.get("priority")))); + } else { + // set queue's priority to 0 by default + when(queue.getPriority()).thenReturn(Priority.newInstance(0)); + } + + // Setup disable preemption of queues + if (otherConfigs.containsKey("disable_preemption")) { + when(queue.getPreemptionDisabled()).thenReturn( + Boolean.valueOf(otherConfigs.get("disable_preemption"))); + } + nameToCSQueues.put(queueName, queue); when(cs.getQueue(eq(queueName))).thenReturn(queue); } /** + * Get additional queue's configurations + * @param queueExpr queue expr + * @return maps of configs + */ + private Map<String, String> getOtherConfigurations(String queueExpr) { + if (queueExpr.contains("{")) { + int left = queueExpr.indexOf('{'); + int right = queueExpr.indexOf('}'); + + if (right > left) { + Map<String, String> configs = new HashMap<>(); + + String subStr = queueExpr.substring(left + 1, right); + for (String kv : subStr.split(",")) { + if (kv.contains("=")) { + String key = kv.substring(0, kv.indexOf("=")); + String value = kv.substring(kv.indexOf("=") + 1); + configs.put(key, value); + } + } + + return configs; + } + } + + return Collections.EMPTY_MAP; + } + + /** * Level of a queue is how many "-" at beginning, root's level is 0 */ private int getLevel(String q) { @@ -739,6 +799,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { Assert.assertEquals(pending, ru.getPending(partition).getMemorySize()); } + public void checkPriority(CSQueue queue, int expectedPriority) { + Assert.assertEquals(expectedPriority, queue.getPriority().getPriority()); + } + public void checkReservedResource(CSQueue queue, String partition, int reserved) { ResourceUsage ru = queue.getQueueResourceUsage(); Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java new file mode 100644 index 0000000..2b54d77 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestPreemptionForQueueWithPriorities + extends ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testPreemptionForHighestPriorityUnderutilizedQueue() + throws IOException { + /** + * The simplest test of queue with priorities, Queue structure is: + * + * <pre> + * root + * / | \ + * a b c + * </pre> + * + * For priorities + * - a=1 + * - b/c=2 + * + * So c will preempt more resource from a, till a reaches guaranteed + * resource. + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 40 50]){priority=1};" + // a + "-b(=[30 100 59 50]){priority=2};" + // b + "-c(=[40 100 1 25]){priority=2}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,40,false);" + // app1 in a + "b\t(1,1,n1,,59,false);" + // app2 in b + "c\t(1,1,n1,,1,false);"; // app3 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 10 preempted from app1, 15 preempted from app2, and nothing preempted + // from app3 + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testPreemptionForLowestPriorityUnderutilizedQueue() + throws IOException { + /** + * Similar to above, make sure we can still make sure less utilized queue + * can get resource first regardless of priority. + * + * Queue structure is: + * + * <pre> + * root + * / | \ + * a b c + * </pre> + * + * For priorities + * - a=1 + * - b=2 + * - c=0 + * + * So c will preempt more resource from a, till a reaches guaranteed + * resource. + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 40 50]){priority=1};" + // a + "-b(=[30 100 59 50]){priority=2};" + // b + "-c(=[40 100 1 25]){priority=0}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,40,false);" + // app1 in a + "b\t(1,1,n1,,59,false);" + // app2 in b + "c\t(1,1,n1,,1,false);"; // app3 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 10 preempted from app1, 15 preempted from app2, and nothing preempted + // from app3 + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testPreemptionWontHappenBetweenSatisfiedQueues() + throws IOException { + /** + * No preemption happen if a queue is already satisfied, regardless of + * priority + * + * Queue structure is: + * + * <pre> + * root + * / | \ + * a b c + * </pre> + * + * For priorities + * - a=1 + * - b=1 + * - c=2 + * + * When c is satisfied, it will not preempt any resource from other queues + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 0 0]){priority=1};" + // a + "-b(=[30 100 40 50]){priority=1};" + // b + "-c(=[40 100 60 25]){priority=2}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "b\t(1,1,n1,,40,false);" + // app1 in b + "c\t(1,1,n1,,60,false)"; // app2 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Nothing preempted + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testPreemptionForMultipleQueuesInTheSamePriorityBuckets() + throws IOException { + /** + * When a cluster has different priorities, each priority has multiple + * queues, preemption policy should try to balance resource between queues + * with same priority by ratio of their capacities + * + * Queue structure is: + * + * <pre> + * root + * - a (capacity=10), p=1 + * - b (capacity=15), p=1 + * - c (capacity=20), p=2 + * - d (capacity=25), p=2 + * - e (capacity=30), p=2 + * </pre> + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[10 100 35 50]){priority=1};" + // a + "-b(=[15 100 25 50]){priority=1};" + // b + "-c(=[20 100 39 50]){priority=2};" + // c + "-d(=[25 100 0 0]){priority=2};" + // d + "-e(=[30 100 1 99]){priority=2}"; // e + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,35,false);" + // app1 in a + "b\t(1,1,n1,,25,false);" + // app2 in b + "c\t(1,1,n1,,39,false);" + // app3 in c + "e\t(1,1,n1,,1,false)"; // app4 in e + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 23 preempted from app1, 6 preempted from app2, and nothing preempted + // from app3/app4 + // (After preemption, a has 35 - 23 = 12, b has 25 - 6 = 19, so a:b after + // preemption is 1.58, close to 1.50) + verify(mDisp, times(23)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(6)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testPreemptionForPriorityAndDisablePreemption() + throws IOException { + /** + * When a cluster has different priorities, each priority has multiple + * queues, preemption policy should try to balance resource between queues + * with same priority by ratio of their capacities. + * + * But also we need to make sure preemption disable will be honered + * regardless of priority. + * + * Queue structure is: + * + * <pre> + * root + * - a (capacity=10), p=1 + * - b (capacity=15), p=1 + * - c (capacity=20), p=2 + * - d (capacity=25), p=2 + * - e (capacity=30), p=2 + * </pre> + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[10 100 35 50]){priority=1,disable_preemption=true};" + // a + "-b(=[15 100 25 50]){priority=1};" + // b + "-c(=[20 100 39 50]){priority=2};" + // c + "-d(=[25 100 0 0]){priority=2};" + // d + "-e(=[30 100 1 99]){priority=2}"; // e + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,35,false);" + // app1 in a + "b\t(1,1,n1,,25,false);" + // app2 in b + "c\t(1,1,n1,,39,false);" + // app3 in c + "e\t(1,1,n1,,1,false)"; // app4 in e + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // We suppose to preempt some resource from A, but now since queueA + // disables preemption, so we need to preempt some resource from B and + // some from C even if C has higher priority than A + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(19)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testPriorityPreemptionForHierarchicalOfQueues() + throws IOException { + /** + * When a queue has multiple hierarchy and different priorities: + * + * <pre> + * root + * - a (capacity=30), p=1 + * - a1 (capacity=40), p=1 + * - a2 (capacity=60), p=1 + * - b (capacity=30), p=1 + * - b1 (capacity=50), p=1 + * - b1 (capacity=50), p=2 + * - c (capacity=40), p=2 + * </pre> + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 40 50]){priority=1};" + // a + "--a1(=[12 100 20 50]){priority=1};" + // a1 + "--a2(=[18 100 20 50]){priority=1};" + // a2 + "-b(=[30 100 59 50]){priority=1};" + // b + "--b1(=[15 100 30 50]){priority=1};" + // b1 + "--b2(=[15 100 29 50]){priority=2};" + // b2 + "-c(=[40 100 1 30]){priority=1}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a1\t(1,1,n1,,20,false);" + // app1 in a1 + "a2\t(1,1,n1,,20,false);" + // app2 in a2 + "b1\t(1,1,n1,,30,false);" + // app3 in b1 + "b2\t(1,1,n1,,29,false);" + // app4 in b2 + "c\t(1,1,n1,,29,false)"; // app5 in c + + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Preemption should first divide capacities between a / b, and b2 should + // get less preemption than b1 (because b2 has higher priority) + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 7eca34f..a14a2b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -220,7 +221,9 @@ public class TestProportionalCapacityPreemptionPolicy { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); + + // A will preempt guaranteed-allocated. + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); } @Test @@ -588,8 +591,8 @@ public class TestProportionalCapacityPreemptionPolicy { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - // correct imbalance between over-capacity queues - verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA))); + // Will not preempt for over capacity queues + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } @Test @@ -702,7 +705,7 @@ public class TestProportionalCapacityPreemptionPolicy { public void testZeroGuarOverCap() { int[][] qData = new int[][] { // / A B C D E F - { 200, 100, 0, 99, 0, 100, 100 }, // abs + { 200, 100, 0, 100, 0, 100, 100 }, // abs { 200, 200, 200, 200, 200, 200, 200 }, // maxCap { 170, 170, 60, 20, 90, 0, 0 }, // used { 85, 50, 30, 10, 10, 20, 20 }, // pending @@ -713,14 +716,14 @@ public class TestProportionalCapacityPreemptionPolicy { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - // we verify both that C has priority on B and D (has it has >0 guarantees) - // and that B and D are force to share their over capacity fairly (as they - // are both zero-guarantees) hence D sees some of its containers preempted - verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC))); + // No preemption should happen because zero guaranteed queues should be + // treated as always satisfied, they should not preempt from each other. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); } - - @Test public void testHierarchicalLarge() { int[][] qData = new int[][] { @@ -1232,6 +1235,13 @@ public class TestProportionalCapacityPreemptionPolicy { when(pq.getChildQueues()).thenReturn(cqs); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); when(pq.getReadLock()).thenReturn(lock.readLock()); + + // Ordering policy + QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class); + when(policy.getConfigName()).thenReturn( + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + when(pq.getQueueOrderingPolicy()).thenReturn(policy); + when(pq.getPriority()).thenReturn(Priority.newInstance(0)); for (int i = 0; i < subqueues; ++i) { pqs.add(pq); } @@ -1302,6 +1312,7 @@ public class TestProportionalCapacityPreemptionPolicy { } ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); when(lq.getReadLock()).thenReturn(lock.readLock()); + when(lq.getPriority()).thenReturn(Priority.newInstance(0)); p.getChildQueues().add(lq); return lq; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index e31a889..1fd455a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -95,7 +95,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions } @Test - public void testNodePartitionPreemptionRespectMaximumCapacity() + public void testNodePartitionPreemptionNotHappenBetweenSatisfiedQueues() throws IOException { /** * Queue structure is: @@ -114,8 +114,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions * 2 apps in cluster. * app1 in b and app2 in c. * - * app1 uses 90x, and app2 use 10x. After preemption, app2 will preempt 10x - * from app1 because of max capacity. + * app1 uses 90x, and app2 use 10x. We don't expect preemption happen + * between them because all of them are satisfied */ String labelsConfig = "=100,true;" + // default partition @@ -139,9 +139,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); policy.editSchedule(); - // 30 preempted from app1, 30 preempted from app4, and nothing preempted - // from app2/app3 - verify(mDisp, times(20)).handle( + // No preemption happens + verify(mDisp, never()).handle( argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); verify(mDisp, never()).handle( argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java index 07d1eef..964a230 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java @@ -46,8 +46,8 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1 - "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2 - "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])"; + "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]){priority=2};" + // a2 + "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0]){priority=1,disable_preemption=true}"; String appsConfig= //queueName\t(priority,resource,host,expression,#repeat,reserved) // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated) @@ -75,6 +75,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework checkPendingResource(cs.getQueue("root"), "red", 100); checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f); checkPendingResource(cs.getQueue("root"), "blue", 200); + checkPriority(cs.getQueue("root"), 0); // default // a checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f); @@ -83,6 +84,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework checkPendingResource(cs.getQueue("a"), "red", 0); checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f); checkPendingResource(cs.getQueue("a"), "blue", 200); + checkPriority(cs.getQueue("a"), 0); // default // a1 checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f); @@ -91,6 +93,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework checkPendingResource(cs.getQueue("a1"), "red", 0); checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f); checkPendingResource(cs.getQueue("a1"), "blue", 0); + checkPriority(cs.getQueue("a1"), 0); // default // a2 checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f); @@ -99,14 +102,18 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework checkPendingResource(cs.getQueue("a2"), "red", 0); checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f); checkPendingResource(cs.getQueue("a2"), "blue", 200); + checkPriority(cs.getQueue("a2"), 2); + Assert.assertFalse(cs.getQueue("a2").getPreemptionDisabled()); - // b1 + // b checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f); checkPendingResource(cs.getQueue("b"), "", 0); checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f); checkPendingResource(cs.getQueue("b"), "red", 100); checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f); checkPendingResource(cs.getQueue("b"), "blue", 0); + checkPriority(cs.getQueue("b"), 1); + Assert.assertTrue(cs.getQueue("b").getPreemptionDisabled()); // Check ignored partitioned containers in queue Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1")) http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java index bd9f615..943b7d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java @@ -46,7 +46,7 @@ public class CapacitySchedulerPreemptionTestBase { final int GB = 1024; - Configuration conf; + CapacitySchedulerConfiguration conf; RMNodeLabelsManager mgr; @@ -54,13 +54,15 @@ public class CapacitySchedulerPreemptionTestBase { @Before void setUp() throws Exception { - conf = new YarnConfiguration(); + conf = new CapacitySchedulerConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class); - conf = TestUtils.getConfigurationWithMultipleQueues(this.conf); + conf = (CapacitySchedulerConfiguration) TestUtils + .getConfigurationWithMultipleQueues(this.conf); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100 * GB); // Set preemption related configurations conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, @@ -146,4 +148,18 @@ public class CapacitySchedulerPreemptionTestBase { Assert.fail(); } + + public void checkNumberOfPreemptionCandidateFromApp( + ProportionalCapacityPreemptionPolicy policy, int expected, + ApplicationAttemptId attemptId) { + int total = 0; + + for (RMContainer rmContainer : policy.getToPreemptContainers().keySet()) { + if (rmContainer.getApplicationAttemptId().equals(attemptId)) { + ++ total; + } + } + + Assert.assertEquals(expected, total); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 7382f3d..046ea4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -110,9 +110,6 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); @@ -276,9 +273,6 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 16)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); @@ -581,9 +575,6 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(GB)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java index 1f87c53..2fa06e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -594,9 +594,6 @@ public class TestApplicationLimitsByPartition { .thenReturn(Resources.createResource(GB)); when(csContext.getMaximumResourceCapability()) .thenReturn(Resources.createResource(16 * GB)); - when(csContext.getNonPartitionedQueueComparator()) - .thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); RMContext rmContext = TestUtils.getMockRMContext(); RMContext spyRMContext = spy(rmContext); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index db6115c..5989da0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -22,22 +22,26 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Set; public class TestCapacitySchedulerSurgicalPreemption extends CapacitySchedulerPreemptionTestBase { @@ -167,8 +171,7 @@ public class TestCapacitySchedulerSurgicalPreemption * * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. * - * 2) app1 submit to queue-a first, it asked 38 * 1G containers - * We will allocate 20 on n1 and 19 on n2. + * 2) app1 submit to queue-b, asks for 1G * 5 * * 3) app2 submit to queue-c, ask for one 4G container (for AM) * @@ -243,4 +246,569 @@ public class TestCapacitySchedulerSurgicalPreemption rm1.close(); } + + @Test(timeout = 60000) + public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * + * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. + * + * 2) app1 submit to queue-b first, it asked 6 * 1G containers + * We will allocate 4 on n1 (including AM) and 3 on n2. + * + * 3) app2 submit to queue-c, ask for one 18G container (for AM) + * + * After preemption, we should expect: + * Preempt 3 containers from app1 and AM of app2 successfully allocated. + */ + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<>()); + + // Do allocation for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, so the abs-used-cap of b is + // 7 / 40 = 17.5% < 20% (guaranteed) + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + // 4 from n1 and 3 from n2 + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), + am1.getApplicationAttemptId(), 4); + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), + am1.getApplicationAttemptId(), 3); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(18 * GB, "app", "user", null, "c"); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(10); + } + + // Call editSchedule immediately: containers are not selected + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Sleep the timeout interval, we should be able to see containers selected + Thread.sleep(1000); + editPolicy.editSchedule(); + Assert.assertEquals(2, editPolicy.getToPreemptContainers().size()); + + // Call editSchedule again: selected containers are killed, and new AM + // container launched + editPolicy.editSchedule(); + + // Do allocation till reserved container allocated + while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(10); + } + + waitNumberOfLiveContainersFromApp(schedulerApp2, 1); + + rm1.close(); + } + + @Test(timeout = 300000) + public void testPriorityPreemptionRequiresMoveReservation() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * + * 1) 3 nodes in the cluster, 10G for each + * + * 2) app1 submit to queue-b first, it asked 2G each, + * it can get 2G on n1 (AM), 2 * 2G on n2 + * + * 3) app2 submit to queue-c, with 2G AM container (allocated on n3) + * app2 requires 9G resource, which will be reserved on n3 + * + * We should expect container unreserved from n3 and allocated on n1/n2 + */ + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + conf.setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(true); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); + MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + RMNode rmNode3 = rm1.getRMContext().getRMNodes().get(nm3.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 2 * GB, 2, new ArrayList<>()); + + // Do allocation for node2 twice + for (int i = 0; i < 2; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + + // 1 from n1 and 2 from n2 + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), + am1.getApplicationAttemptId(), 1); + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), + am1.getApplicationAttemptId(), 2); + + // Submit app2 to queue-c and asks for a 2G container for AM, on n3 + RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Asks 1 * 9G container + am2.allocate("*", 9 * GB, 1, new ArrayList<>()); + + // Do allocation for node3 once + cs.handle(new NodeUpdateSchedulerEvent(rmNode3)); + + // Make sure container reserved on node3 + Assert.assertNotNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + + // Call editSchedule immediately: nothing happens + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + Assert.assertNotNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + + // Sleep the timeout interval, we should be able to see reserved container + // moved to n2 (n1 occupied by AM) + Thread.sleep(1000); + editPolicy.editSchedule(); + Assert.assertNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + Assert.assertNotNull( + cs.getNode(rmNode2.getNodeID()).getReservedContainer()); + Assert.assertEquals(am2.getApplicationAttemptId(), cs.getNode( + rmNode2.getNodeID()).getReservedContainer().getApplicationAttemptId()); + + // Do it again, we should see containers marked to be preempt + editPolicy.editSchedule(); + Assert.assertEquals(2, editPolicy.getToPreemptContainers().size()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + + // Do allocation till reserved container allocated + while (schedulerApp2.getLiveContainers().size() < 2) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + Thread.sleep(200); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 1); + + rm1.close(); + } + + @Test(timeout = 60000) + public void testPriorityPreemptionOnlyTriggeredWhenDemandingQueueUnsatisfied() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * + * 1) 10 nodes (n0-n9) in the cluster, each of them has 10G. + * + * 2) app1 submit to queue-b first, it asked 8 * 1G containers + * We will allocate 1 container on each of n0-n10 + * + * 3) app2 submit to queue-c, ask for 10 * 10G containers (including AM) + * + * After preemption, we should expect: + * Preempt 7 containers from app1 and usage of app2 is 70% + */ + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM[] mockNMs = new MockNM[10]; + for (int i = 0; i < 10; i++) { + mockNMs[i] = rm1.registerNode("h" + i + ":1234", 10 * GB); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode[] rmNodes = new RMNode[10]; + for (int i = 0; i < 10; i++) { + rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId()); + } + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[0]); + + am1.allocate("*", 1 * GB, 8, new ArrayList<>()); + + // Do allocation for nm1-nm8 + for (int i = 1; i < 9; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // App1 should have 9 containers now, so the abs-used-cap of b is 9% + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(9, schedulerApp1.getLiveContainers().size()); + for (int i = 0; i < 9; i++) { + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()), + am1.getApplicationAttemptId(), 1); + } + + // Submit app2 to queue-c and asks for a 10G container for AM + // Launch AM in NM9 + RMApp app2 = rm1.submitApp(10 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[9]); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Ask 10 * 10GB containers + am2.allocate("*", 10 * GB, 10, new ArrayList<>()); + + // Do allocation for all nms + for (int i = 1; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // Check am2 reserved resource from nm1-nm9 + for (int i = 1; i < 9; i++) { + Assert.assertNotNull("Should reserve on nm-" + i, + cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); + } + + // Sleep the timeout interval, we should be able to see 6 containers selected + // 6 (selected) + 1 (allocated) which makes target capacity to 70% + Thread.sleep(1000); + + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + checkNumberOfPreemptionCandidateFromApp(editPolicy, 6, + am1.getApplicationAttemptId()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 3); + + // Do allocation for all nms + for (int i = 1; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + waitNumberOfLiveContainersFromApp(schedulerApp2, 7); + waitNumberOfLiveContainersFromApp(schedulerApp1, 3); + + rm1.close(); + } + + @Test(timeout = 600000) + public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + * <pre> + * Root + * / | \ + * a b c + * 45 45 10 + * </pre> + * + * Priority of queue_a = 1 + * Priority of queue_b = 2 + * + * 1) 5 nodes (n0-n4) in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-c first (AM=1G), it asked 4 * 1G containers + * We will allocate 1 container on each of n0-n4. AM on n4. + * + * 3) app2 submit to queue-a, AM container=0.5G, allocated on n0 + * Ask for 2 * 3.5G containers. (Reserved on n0/n1) + * + * 4) app2 submit to queue-b, AM container=0.5G, allocated on n2 + * Ask for 2 * 3.5G containers. (Reserved on n2/n3) + * + * First we will preempt container on n2 since it is the oldest container of + * Highest priority queue (b) + */ + + // Total preemption = 1G per round, which is 5% of cluster resource (20G) + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + 0.05f); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // A/B has higher priority + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 1); + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM[] mockNMs = new MockNM[5]; + for (int i = 0; i < 5; i++) { + mockNMs[i] = rm1.registerNode("h" + i + ":1234", 4 * GB); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode[] rmNodes = new RMNode[5]; + for (int i = 0; i < 5; i++) { + rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId()); + } + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]); + + am1.allocate("*", 1 * GB, 4, new ArrayList<>()); + + // Do allocation for nm1-nm8 + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // App1 should have 5 containers now, one for each node + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(5, schedulerApp1.getLiveContainers().size()); + for (int i = 0; i < 5; i++) { + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()), + am1.getApplicationAttemptId(), 1); + } + + // Submit app2 to queue-a and asks for a 0.5G container for AM (on n0) + RMApp app2 = rm1.submitApp(512, "app", "user", null, "a"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Ask 2 * 3.5GB containers + am2.allocate("*", 3 * GB + 512, 2, new ArrayList<>()); + + // Do allocation for n0-n1 + for (int i = 0; i < 2; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // Check am2 reserved resource from nm0-nm1 + for (int i = 0; i < 2; i++) { + Assert.assertNotNull("Should reserve on nm-" + i, + cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); + Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID()) + .getReservedContainer().getQueueName(), "a"); + } + + // Submit app3 to queue-b and asks for a 0.5G container for AM (on n2) + RMApp app3 = rm1.submitApp(512, "app", "user", null, "b"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]); + FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app3.getApplicationId(), 1)); + + // Ask 2 * 3.5GB containers + am3.allocate("*", 3 * GB + 512, 2, new ArrayList<>()); + + // Do allocation for n2-n3 + for (int i = 2; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // Check am2 reserved resource from nm2-nm3 + for (int i = 2; i < 4; i++) { + Assert.assertNotNull("Should reserve on nm-" + i, + cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); + Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID()) + .getReservedContainer().getQueueName(), "b"); + } + + // Sleep the timeout interval, we should be able to see 1 container selected + Thread.sleep(1000); + + /* 1st container preempted is on n2 */ + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + + // We should have one to-preempt container, on node[2] + Set<RMContainer> selectedToPreempt = + editPolicy.getToPreemptContainers().keySet(); + Assert.assertEquals(1, selectedToPreempt.size()); + Assert.assertEquals(mockNMs[2].getNodeId(), + selectedToPreempt.iterator().next().getAllocatedNode()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + + // Do allocation for all nms + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + waitNumberOfLiveContainersFromApp(schedulerApp1, 4); + + waitNumberOfLiveContainersFromApp(schedulerApp1, 4); + waitNumberOfLiveContainersFromApp(schedulerApp2, 1); + waitNumberOfLiveContainersFromApp(schedulerApp3, 2); + + /* 2nd container preempted is on n3 */ + editPolicy.editSchedule(); + + // We should have one to-preempt container, on node[3] + selectedToPreempt = + editPolicy.getToPreemptContainers().keySet(); + Assert.assertEquals(1, selectedToPreempt.size()); + Assert.assertEquals(mockNMs[3].getNodeId(), + selectedToPreempt.iterator().next().getAllocatedNode()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 3); + + // Do allocation for all nms + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 3); + waitNumberOfLiveContainersFromApp(schedulerApp2, 1); + waitNumberOfLiveContainersFromApp(schedulerApp3, 3); + + /* 3rd container preempted is on n0 */ + editPolicy.editSchedule(); + + // We should have one to-preempt container, on node[0] + selectedToPreempt = + editPolicy.getToPreemptContainers().keySet(); + Assert.assertEquals(1, selectedToPreempt.size()); + Assert.assertEquals(mockNMs[0].getNodeId(), + selectedToPreempt.iterator().next().getAllocatedNode()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 2); + + // Do allocation for all nms + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 2); + waitNumberOfLiveContainersFromApp(schedulerApp2, 2); + waitNumberOfLiveContainersFromApp(schedulerApp3, 3); + + /* 4th container preempted is on n1 */ + editPolicy.editSchedule(); + + // We should have one to-preempt container, on node[0] + selectedToPreempt = + editPolicy.getToPreemptContainers().keySet(); + Assert.assertEquals(1, selectedToPreempt.size()); + Assert.assertEquals(mockNMs[1].getNodeId(), + selectedToPreempt.iterator().next().getAllocatedNode()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 1); + + // Do allocation for all nms + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 1); + waitNumberOfLiveContainersFromApp(schedulerApp2, 3); + waitNumberOfLiveContainersFromApp(schedulerApp3, 3); + + rm1.close(); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce832059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 3c1f676..899523c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -99,9 +99,6 @@ public class TestChildQueueOrder { Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org