Repository: hadoop
Updated Branches:
  refs/heads/YARN-2915 c58725b8d -> 08dc09581 (forced update)


YARN-5830. FairScheduler: Avoid preempting AM containers. (Yufei Gu via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/abedb8a9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/abedb8a9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/abedb8a9

Branch: refs/heads/YARN-2915
Commit: abedb8a9d86b4593a37fd3d2313fbcb057c7846a
Parents: b782bf2
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Jan 25 12:17:28 2017 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Jan 25 12:17:28 2017 -0800

----------------------------------------------------------------------
 .../scheduler/SchedulerNode.java                |  21 ++-
 .../scheduler/fair/FSPreemptionThread.java      | 135 ++++++++++++++-----
 .../fair/TestFairSchedulerPreemption.java       | 103 +++++++++++---
 3 files changed, 206 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/abedb8a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 59ca81b..9c2dff3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -370,8 +371,8 @@ public abstract class SchedulerNode {
   }
 
   /**
-   * Get the running containers in the node.
-   * @return List of running containers in the node.
+   * Get the containers running on the node.
+   * @return A copy of containers running on the node.
    */
   public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
     List<RMContainer> result = new ArrayList<>(launchedContainers.size());
@@ -382,6 +383,22 @@ public abstract class SchedulerNode {
   }
 
   /**
+   * Get the containers running on the node with AM containers at the end.
+   * @return A copy of running containers with AM containers at the end.
+   */
+  public synchronized List<RMContainer> getRunningContainersWithAMsAtTheEnd() {
+    LinkedList<RMContainer> result = new LinkedList<>();
+    for (ContainerInfo info : launchedContainers.values()) {
+      if(info.container.isAMContainer()) {
+        result.addLast(info.container);
+      } else {
+        result.addFirst(info.container);
+      }
+    }
+    return result;
+  }
+
+  /**
    * Get the container for the specified container ID.
    * @param containerId The container ID
    * @return The container for the specified container ID

http://git-wip-us.apache.org/repos/asf/hadoop/blob/abedb8a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index f432484..f166878 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -65,10 +65,10 @@ class FSPreemptionThread extends Thread {
       try{
         starvedApp = context.getStarvedApps().take();
         if (!Resources.isNone(starvedApp.getStarvation())) {
-          List<RMContainer> containers =
+          PreemptableContainers containers =
               identifyContainersToPreempt(starvedApp);
           if (containers != null) {
-            preemptContainers(containers);
+            preemptContainers(containers.containers);
           }
         }
       } catch (InterruptedException e) {
@@ -87,9 +87,9 @@ class FSPreemptionThread extends Thread {
    * @return list of containers to preempt to satisfy starvedApp, null if the
    * app cannot be satisfied by preempting any running containers
    */
-  private List<RMContainer> identifyContainersToPreempt(
+  private PreemptableContainers identifyContainersToPreempt(
       FSAppAttempt starvedApp) {
-    List<RMContainer> containers = new ArrayList<>(); // return value
+    PreemptableContainers bestContainers = null;
 
     // Find the nodes that match the next resource request
     SchedulingPlacementSet nextPs =
@@ -107,9 +107,6 @@ class FSPreemptionThread extends Thread {
     // From the potential nodes, pick a node that has enough containers
     // from apps over their fairshare
     for (FSSchedulerNode node : potentialNodes) {
-      // Reset containers for the new node being considered.
-      containers.clear();
-
       // TODO (YARN-5829): Attempt to reserve the node for starved app. The
       // subsequent if-check needs to be reworked accordingly.
       FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
@@ -119,39 +116,81 @@ class FSPreemptionThread extends Thread {
         continue;
       }
 
-      // Figure out list of containers to consider
-      List<RMContainer> containersToCheck =
-          node.getCopiedListOfRunningContainers();
-      containersToCheck.removeAll(node.getContainersForPreemption());
-
-      // Initialize potential with unallocated resources
-      Resource potential = Resources.clone(node.getUnallocatedResource());
-      for (RMContainer container : containersToCheck) {
-        FSAppAttempt app =
-            scheduler.getSchedulerApp(container.getApplicationAttemptId());
-
-        if (app.canContainerBePreempted(container)) {
-          // Flag container for preemption
-          containers.add(container);
-          Resources.addTo(potential, container.getAllocatedResource());
+      int maxAMContainers = bestContainers == null ?
+          Integer.MAX_VALUE : bestContainers.numAMContainers;
+      PreemptableContainers preemptableContainers =
+          identifyContainersToPreemptOnNode(requestCapability, node,
+              maxAMContainers);
+      if (preemptableContainers != null) {
+        if (preemptableContainers.numAMContainers == 0) {
+          return preemptableContainers;
+        } else {
+          bestContainers = preemptableContainers;
         }
+      }
+    }
 
-        // Check if we have already identified enough containers
-        if (Resources.fitsIn(requestCapability, potential)) {
-          // Mark the containers as being considered for preemption on the 
node.
-          // Make sure the containers are subsequently removed by calling
-          // FSSchedulerNode#removeContainerForPreemption.
-          node.addContainersForPreemption(containers);
-          return containers;
-        } else {
-          // TODO (YARN-5829): Unreserve the node for the starved app.
+    return bestContainers;
+  }
+
+  /**
+   * Identify containers to preempt on a given node. Try to find a list with
+   * least AM containers to avoid preempting AM containers. This method returns
+   * a non-null set of containers only if the number of AM containers is less
+   * than maxAMContainers.
+   *
+   * @param request resource requested
+   * @param node the node to check
+   * @param maxAMContainers max allowed AM containers in the set
+   * @return list of preemptable containers with fewer AM containers than
+   *         maxAMContainers if such a list exists; null otherwise.
+   */
+  private PreemptableContainers identifyContainersToPreemptOnNode(
+      Resource request, FSSchedulerNode node, int maxAMContainers) {
+    PreemptableContainers preemptableContainers =
+        new PreemptableContainers(maxAMContainers);
+
+    // Figure out list of containers to consider
+    List<RMContainer> containersToCheck =
+        node.getRunningContainersWithAMsAtTheEnd();
+    containersToCheck.removeAll(node.getContainersForPreemption());
+
+    // Initialize potential with unallocated resources
+    Resource potential = Resources.clone(node.getUnallocatedResource());
+
+    for (RMContainer container : containersToCheck) {
+      FSAppAttempt app =
+          scheduler.getSchedulerApp(container.getApplicationAttemptId());
+
+      if (app.canContainerBePreempted(container)) {
+        // Flag container for preemption
+        if (!preemptableContainers.addContainer(container)) {
+          return null;
         }
+
+        Resources.addTo(potential, container.getAllocatedResource());
+      }
+
+      // Check if we have already identified enough containers
+      if (Resources.fitsIn(request, potential)) {
+        return preemptableContainers;
+      } else {
+        // TODO (YARN-5829): Unreserve the node for the starved app.
       }
     }
     return null;
   }
 
   private void preemptContainers(List<RMContainer> containers) {
+    // Mark the containers as being considered for preemption on the node.
+    // Make sure the containers are subsequently removed by calling
+    // FSSchedulerNode#removeContainerForPreemption.
+    if (containers.size() > 0) {
+      FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
+          .getNode(containers.get(0).getNodeId());
+      node.addContainersForPreemption(containers);
+    }
+
     // Warn application about containers to be killed
     for (RMContainer container : containers) {
       ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
@@ -190,4 +229,38 @@ class FSPreemptionThread extends Thread {
       }
     }
   }
+
+  /**
+   * A class to track preemptable containers.
+   */
+  private static class PreemptableContainers {
+    List<RMContainer> containers;
+    int numAMContainers;
+    int maxAMContainers;
+
+    PreemptableContainers(int maxAMContainers) {
+      containers = new ArrayList<>();
+      numAMContainers = 0;
+      this.maxAMContainers = maxAMContainers;
+    }
+
+    /**
+     * Add a container if the number of AM containers is less than
+     * maxAMContainers.
+     *
+     * @param container the container to add
+     * @return true if success; false otherwise
+     */
+    private boolean addContainer(RMContainer container) {
+      if (container.isAMContainer()) {
+        numAMContainers++;
+        if (numAMContainers >= maxAMContainers) {
+          return false;
+        }
+      }
+
+      containers.add(container);
+      return true;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/abedb8a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 8bc6cf5..16df1ed 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -21,6 +21,8 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.junit.After;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -34,8 +36,10 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
 /**
  * Tests to verify fairshare and minshare preemption, using parameterization.
@@ -43,6 +47,7 @@ import java.util.Collection;
 @RunWith(Parameterized.class)
 public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
   private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
+  private static final int GB = 1024;
 
   // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
   private static final int NODE_CAPACITY_MULTIPLE = 4;
@@ -165,8 +170,8 @@ public class TestFairSchedulerPreemption extends 
FairSchedulerTestBase {
     scheduler = (FairScheduler) resourceManager.getResourceScheduler();
 
     // Create and add two nodes to the cluster
-    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
-    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+    addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE);
+    addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE);
 
     // Verify if child-1 and child-2 are preemptable
     FSQueue child1 =
@@ -188,41 +193,60 @@ public class TestFairSchedulerPreemption extends 
FairSchedulerTestBase {
   }
 
   /**
-   * Submit application to {@code queue1} and take over the entire cluster.
-   * Submit application with larger containers to {@code queue2} that
-   * requires preemption from the first application.
+   * Submit an application to a given queue and take over the entire cluster.
    *
-   * @param queue1 first queue
-   * @param queue2 second queue
-   * @throws InterruptedException if interrupted while waiting
+   * @param queueName queue name
    */
-  private void submitApps(String queue1, String queue2)
-      throws InterruptedException {
+  private void takeAllResource(String queueName) {
     // Create an app that takes up all the resources on the cluster
-    ApplicationAttemptId appAttemptId1
-        = createSchedulingRequest(1024, 1, queue1, "default",
+    ApplicationAttemptId appAttemptId
+        = createSchedulingRequest(GB, 1, queueName, "default",
         NODE_CAPACITY_MULTIPLE * rmNodes.size());
-    greedyApp = scheduler.getSchedulerApp(appAttemptId1);
+    greedyApp = scheduler.getSchedulerApp(appAttemptId);
     scheduler.update();
     sendEnoughNodeUpdatesToAssignFully();
     assertEquals(8, greedyApp.getLiveContainers().size());
     // Verify preemptable for queue and app attempt
     assertTrue(
-        scheduler.getQueueManager().getQueue(queue1).isPreemptable()
-        == greedyApp.isPreemptable());
+        scheduler.getQueueManager().getQueue(queueName).isPreemptable()
+            == greedyApp.isPreemptable());
+  }
 
-    // Create an app that takes up all the resources on the cluster
-    ApplicationAttemptId appAttemptId2
-        = createSchedulingRequest(2048, 2, queue2, "default",
+  /**
+   * Submit an application to a given queue and preempt half resources of the
+   * cluster.
+   *
+   * @param queueName queue name
+   * @throws InterruptedException
+   *         if any thread has interrupted the current thread.
+   */
+  private void preemptHalfResources(String queueName)
+      throws InterruptedException {
+    ApplicationAttemptId appAttemptId
+        = createSchedulingRequest(2 * GB, 2, queueName, "default",
         NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
-    starvingApp = scheduler.getSchedulerApp(appAttemptId2);
+    starvingApp = scheduler.getSchedulerApp(appAttemptId);
 
     // Sleep long enough to pass
     Thread.sleep(10);
-
     scheduler.update();
   }
 
+  /**
+   * Submit application to {@code queue1} and take over the entire cluster.
+   * Submit application with larger containers to {@code queue2} that
+   * requires preemption from the first application.
+   *
+   * @param queue1 first queue
+   * @param queue2 second queue
+   * @throws InterruptedException if interrupted while waiting
+   */
+  private void submitApps(String queue1, String queue2)
+      throws InterruptedException {
+    takeAllResource(queue1);
+    preemptHalfResources(queue2);
+  }
+
   private void verifyPreemption() throws InterruptedException {
     // Sleep long enough for four containers to be preempted. Note that the
     // starved app must be queued four times for containers to be preempted.
@@ -285,4 +309,43 @@ public class TestFairSchedulerPreemption extends 
FairSchedulerTestBase {
     submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
     verifyNoPreemption();
   }
+
+  /**
+   * Set the number of AM containers for each node.
+   *
+   * @param numAMContainersPerNode number of AM containers per node
+   */
+  private void setNumAMContainersPerNode(int numAMContainersPerNode) {
+    List<FSSchedulerNode> potentialNodes =
+        scheduler.getNodeTracker().getNodesByResourceName("*");
+    for (FSSchedulerNode node: potentialNodes) {
+      List<RMContainer> containers=
+          node.getCopiedListOfRunningContainers();
+      // Change the first numAMContainersPerNode out of 4 containers to
+      // AM containers
+      for (int i = 0; i < numAMContainersPerNode; i++) {
+        ((RMContainerImpl) containers.get(i)).setAMContainer(true);
+      }
+    }
+  }
+
+  @Test
+  public void testPreemptionSelectNonAMContainer() throws Exception {
+    setupCluster();
+
+    takeAllResource("root.preemptable.child-1");
+    setNumAMContainersPerNode(2);
+    preemptHalfResources("root.preemptable.child-2");
+
+    verifyPreemption();
+
+    ArrayList<RMContainer> containers =
+        (ArrayList<RMContainer>) starvingApp.getLiveContainers();
+    String host0 = containers.get(0).getNodeId().getHost();
+    String host1 = containers.get(1).getNodeId().getHost();
+    // Each node provides two and only two non-AM containers to be preempted, 
so
+    // the preemption happens on both nodes.
+    assertTrue("Preempted containers should come from two different "
+        + "nodes.", !host0.equals(host1));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to