Repository: hadoop Updated Branches: refs/heads/YARN-2915 c58725b8d -> 08dc09581 (forced update)
YARN-5830. FairScheduler: Avoid preempting AM containers. (Yufei Gu via kasha) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/abedb8a9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/abedb8a9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/abedb8a9 Branch: refs/heads/YARN-2915 Commit: abedb8a9d86b4593a37fd3d2313fbcb057c7846a Parents: b782bf2 Author: Karthik Kambatla <ka...@apache.org> Authored: Wed Jan 25 12:17:28 2017 -0800 Committer: Karthik Kambatla <ka...@apache.org> Committed: Wed Jan 25 12:17:28 2017 -0800 ---------------------------------------------------------------------- .../scheduler/SchedulerNode.java | 21 ++- .../scheduler/fair/FSPreemptionThread.java | 135 ++++++++++++++----- .../fair/TestFairSchedulerPreemption.java | 103 +++++++++++--- 3 files changed, 206 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/abedb8a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 59ca81b..9c2dff3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; +import java.util.LinkedList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -370,8 +371,8 @@ public abstract class SchedulerNode { } /** - * Get the running containers in the node. - * @return List of running containers in the node. + * Get the containers running on the node. + * @return A copy of containers running on the node. */ public synchronized List<RMContainer> getCopiedListOfRunningContainers() { List<RMContainer> result = new ArrayList<>(launchedContainers.size()); @@ -382,6 +383,22 @@ public abstract class SchedulerNode { } /** + * Get the containers running on the node with AM containers at the end. + * @return A copy of running containers with AM containers at the end. + */ + public synchronized List<RMContainer> getRunningContainersWithAMsAtTheEnd() { + LinkedList<RMContainer> result = new LinkedList<>(); + for (ContainerInfo info : launchedContainers.values()) { + if(info.container.isAMContainer()) { + result.addLast(info.container); + } else { + result.addFirst(info.container); + } + } + return result; + } + + /** * Get the container for the specified container ID. * @param containerId The container ID * @return The container for the specified container ID http://git-wip-us.apache.org/repos/asf/hadoop/blob/abedb8a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index f432484..f166878 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -65,10 +65,10 @@ class FSPreemptionThread extends Thread { try{ starvedApp = context.getStarvedApps().take(); if (!Resources.isNone(starvedApp.getStarvation())) { - List<RMContainer> containers = + PreemptableContainers containers = identifyContainersToPreempt(starvedApp); if (containers != null) { - preemptContainers(containers); + preemptContainers(containers.containers); } } } catch (InterruptedException e) { @@ -87,9 +87,9 @@ class FSPreemptionThread extends Thread { * @return list of containers to preempt to satisfy starvedApp, null if the * app cannot be satisfied by preempting any running containers */ - private List<RMContainer> identifyContainersToPreempt( + private PreemptableContainers identifyContainersToPreempt( FSAppAttempt starvedApp) { - List<RMContainer> containers = new ArrayList<>(); // return value + PreemptableContainers bestContainers = null; // Find the nodes that match the next resource request SchedulingPlacementSet nextPs = @@ -107,9 +107,6 @@ class FSPreemptionThread extends Thread { // From the potential nodes, pick a node that has enough containers // from apps over their fairshare for (FSSchedulerNode node : potentialNodes) { - // Reset containers for the new node being considered. - containers.clear(); - // TODO (YARN-5829): Attempt to reserve the node for starved app. The // subsequent if-check needs to be reworked accordingly. FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); @@ -119,39 +116,81 @@ class FSPreemptionThread extends Thread { continue; } - // Figure out list of containers to consider - List<RMContainer> containersToCheck = - node.getCopiedListOfRunningContainers(); - containersToCheck.removeAll(node.getContainersForPreemption()); - - // Initialize potential with unallocated resources - Resource potential = Resources.clone(node.getUnallocatedResource()); - for (RMContainer container : containersToCheck) { - FSAppAttempt app = - scheduler.getSchedulerApp(container.getApplicationAttemptId()); - - if (app.canContainerBePreempted(container)) { - // Flag container for preemption - containers.add(container); - Resources.addTo(potential, container.getAllocatedResource()); + int maxAMContainers = bestContainers == null ? + Integer.MAX_VALUE : bestContainers.numAMContainers; + PreemptableContainers preemptableContainers = + identifyContainersToPreemptOnNode(requestCapability, node, + maxAMContainers); + if (preemptableContainers != null) { + if (preemptableContainers.numAMContainers == 0) { + return preemptableContainers; + } else { + bestContainers = preemptableContainers; } + } + } - // Check if we have already identified enough containers - if (Resources.fitsIn(requestCapability, potential)) { - // Mark the containers as being considered for preemption on the node. - // Make sure the containers are subsequently removed by calling - // FSSchedulerNode#removeContainerForPreemption. - node.addContainersForPreemption(containers); - return containers; - } else { - // TODO (YARN-5829): Unreserve the node for the starved app. + return bestContainers; + } + + /** + * Identify containers to preempt on a given node. Try to find a list with + * least AM containers to avoid preempting AM containers. This method returns + * a non-null set of containers only if the number of AM containers is less + * than maxAMContainers. + * + * @param request resource requested + * @param node the node to check + * @param maxAMContainers max allowed AM containers in the set + * @return list of preemptable containers with fewer AM containers than + * maxAMContainers if such a list exists; null otherwise. + */ + private PreemptableContainers identifyContainersToPreemptOnNode( + Resource request, FSSchedulerNode node, int maxAMContainers) { + PreemptableContainers preemptableContainers = + new PreemptableContainers(maxAMContainers); + + // Figure out list of containers to consider + List<RMContainer> containersToCheck = + node.getRunningContainersWithAMsAtTheEnd(); + containersToCheck.removeAll(node.getContainersForPreemption()); + + // Initialize potential with unallocated resources + Resource potential = Resources.clone(node.getUnallocatedResource()); + + for (RMContainer container : containersToCheck) { + FSAppAttempt app = + scheduler.getSchedulerApp(container.getApplicationAttemptId()); + + if (app.canContainerBePreempted(container)) { + // Flag container for preemption + if (!preemptableContainers.addContainer(container)) { + return null; } + + Resources.addTo(potential, container.getAllocatedResource()); + } + + // Check if we have already identified enough containers + if (Resources.fitsIn(request, potential)) { + return preemptableContainers; + } else { + // TODO (YARN-5829): Unreserve the node for the starved app. } } return null; } private void preemptContainers(List<RMContainer> containers) { + // Mark the containers as being considered for preemption on the node. + // Make sure the containers are subsequently removed by calling + // FSSchedulerNode#removeContainerForPreemption. + if (containers.size() > 0) { + FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker() + .getNode(containers.get(0).getNodeId()); + node.addContainersForPreemption(containers); + } + // Warn application about containers to be killed for (RMContainer container : containers) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); @@ -190,4 +229,38 @@ class FSPreemptionThread extends Thread { } } } + + /** + * A class to track preemptable containers. + */ + private static class PreemptableContainers { + List<RMContainer> containers; + int numAMContainers; + int maxAMContainers; + + PreemptableContainers(int maxAMContainers) { + containers = new ArrayList<>(); + numAMContainers = 0; + this.maxAMContainers = maxAMContainers; + } + + /** + * Add a container if the number of AM containers is less than + * maxAMContainers. + * + * @param container the container to add + * @return true if success; false otherwise + */ + private boolean addContainer(RMContainer container) { + if (container.isAMContainer()) { + numAMContainers++; + if (numAMContainers >= maxAMContainers) { + return false; + } + } + + containers.add(container); + return true; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/abedb8a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 8bc6cf5..16df1ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -21,6 +21,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -34,8 +36,10 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; /** * Tests to verify fairshare and minshare preemption, using parameterization. @@ -43,6 +47,7 @@ import java.util.Collection; @RunWith(Parameterized.class) public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); + private static final int GB = 1024; // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; @@ -165,8 +170,8 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { scheduler = (FairScheduler) resourceManager.getResourceScheduler(); // Create and add two nodes to the cluster - addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); - addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); // Verify if child-1 and child-2 are preemptable FSQueue child1 = @@ -188,41 +193,60 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { } /** - * Submit application to {@code queue1} and take over the entire cluster. - * Submit application with larger containers to {@code queue2} that - * requires preemption from the first application. + * Submit an application to a given queue and take over the entire cluster. * - * @param queue1 first queue - * @param queue2 second queue - * @throws InterruptedException if interrupted while waiting + * @param queueName queue name */ - private void submitApps(String queue1, String queue2) - throws InterruptedException { + private void takeAllResource(String queueName) { // Create an app that takes up all the resources on the cluster - ApplicationAttemptId appAttemptId1 - = createSchedulingRequest(1024, 1, queue1, "default", + ApplicationAttemptId appAttemptId + = createSchedulingRequest(GB, 1, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size()); - greedyApp = scheduler.getSchedulerApp(appAttemptId1); + greedyApp = scheduler.getSchedulerApp(appAttemptId); scheduler.update(); sendEnoughNodeUpdatesToAssignFully(); assertEquals(8, greedyApp.getLiveContainers().size()); // Verify preemptable for queue and app attempt assertTrue( - scheduler.getQueueManager().getQueue(queue1).isPreemptable() - == greedyApp.isPreemptable()); + scheduler.getQueueManager().getQueue(queueName).isPreemptable() + == greedyApp.isPreemptable()); + } - // Create an app that takes up all the resources on the cluster - ApplicationAttemptId appAttemptId2 - = createSchedulingRequest(2048, 2, queue2, "default", + /** + * Submit an application to a given queue and preempt half resources of the + * cluster. + * + * @param queueName queue name + * @throws InterruptedException + * if any thread has interrupted the current thread. + */ + private void preemptHalfResources(String queueName) + throws InterruptedException { + ApplicationAttemptId appAttemptId + = createSchedulingRequest(2 * GB, 2, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); - starvingApp = scheduler.getSchedulerApp(appAttemptId2); + starvingApp = scheduler.getSchedulerApp(appAttemptId); // Sleep long enough to pass Thread.sleep(10); - scheduler.update(); } + /** + * Submit application to {@code queue1} and take over the entire cluster. + * Submit application with larger containers to {@code queue2} that + * requires preemption from the first application. + * + * @param queue1 first queue + * @param queue2 second queue + * @throws InterruptedException if interrupted while waiting + */ + private void submitApps(String queue1, String queue2) + throws InterruptedException { + takeAllResource(queue1); + preemptHalfResources(queue2); + } + private void verifyPreemption() throws InterruptedException { // Sleep long enough for four containers to be preempted. Note that the // starved app must be queued four times for containers to be preempted. @@ -285,4 +309,43 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); verifyNoPreemption(); } + + /** + * Set the number of AM containers for each node. + * + * @param numAMContainersPerNode number of AM containers per node + */ + private void setNumAMContainersPerNode(int numAMContainersPerNode) { + List<FSSchedulerNode> potentialNodes = + scheduler.getNodeTracker().getNodesByResourceName("*"); + for (FSSchedulerNode node: potentialNodes) { + List<RMContainer> containers= + node.getCopiedListOfRunningContainers(); + // Change the first numAMContainersPerNode out of 4 containers to + // AM containers + for (int i = 0; i < numAMContainersPerNode; i++) { + ((RMContainerImpl) containers.get(i)).setAMContainer(true); + } + } + } + + @Test + public void testPreemptionSelectNonAMContainer() throws Exception { + setupCluster(); + + takeAllResource("root.preemptable.child-1"); + setNumAMContainersPerNode(2); + preemptHalfResources("root.preemptable.child-2"); + + verifyPreemption(); + + ArrayList<RMContainer> containers = + (ArrayList<RMContainer>) starvingApp.getLiveContainers(); + String host0 = containers.get(0).getNodeId().getHost(); + String host1 = containers.get(1).getNodeId().getHost(); + // Each node provides two and only two non-AM containers to be preempted, so + // the preemption happens on both nodes. + assertTrue("Preempted containers should come from two different " + + "nodes.", !host0.equals(host1)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org