Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 2c2ff7da0 -> 40372be7b


YARN-7541. Node updates don't update the maximum cluster capability for 
resources other than CPU and memory

(cherry picked from commit 8498d287cd3beddcf8fe19625227e09982ec4be2)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/40372be7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/40372be7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/40372be7

Branch: refs/heads/branch-3.0
Commit: 40372be7bb2b08dcfd94d077233683ddb1edace1
Parents: 2c2ff7d
Author: Daniel Templeton <templ...@apache.org>
Authored: Wed Nov 29 10:36:19 2017 -0800
Committer: Daniel Templeton <templ...@apache.org>
Committed: Thu Nov 30 09:26:25 2017 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/api/records/Resource.java       |  21 ++++
 .../yarn/util/resource/ResourceUtils.java       |  31 ++++-
 .../scheduler/ClusterNodeTracker.java           |  79 ++++++++----
 .../yarn/server/resourcemanager/MockNodes.java  |   4 +
 .../scheduler/TestClusterNodeTracker.java       | 125 ++++++++++++++++++-
 5 files changed, 230 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/40372be7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
index e863d68..5f1455f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.api.records;
 
 import java.util.Arrays;
+import java.util.Map;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -84,6 +85,26 @@ public abstract class Resource implements 
Comparable<Resource> {
     return new LightWeightResource(memory, vCores);
   }
 
+  /**
+   * Create a new {@link Resource} instance with the given CPU and memory
+   * values and additional resource values as set in the {@code others}
+   * parameter. Note that the CPU and memory settings in the {@code others}
+   * parameter will be ignored.
+   *
+   * @param memory the memory value
+   * @param vCores the CPU value
+   * @param others a map of other resource values indexed by resource name
+   * @return a {@link Resource} instance with the given resource values
+   */
+  @Public
+  @Stable
+  public static Resource newInstance(long memory, int vCores,
+      Map<String, Long> others) {
+    ResourceInformation[] info = 
ResourceUtils.createResourceTypesArray(others);
+
+    return new LightWeightResource(memory, vCores, info);
+  }
+
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   public static Resource newInstance(Resource resource) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40372be7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
index ddd3901..571b73e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
@@ -609,7 +609,6 @@ public class ResourceUtils {
     return result;
   }
 
-
   /**
    * Reinitialize all resource types from external source (in case of client,
    * server will send the updated list and local resourceutils cache will be
@@ -630,4 +629,34 @@ public class ResourceUtils {
     ResourceUtils
         .initializeResourcesFromResourceInformationMap(resourceInformationMap);
   }
+
+  /**
+   * Create an array of {@link ResourceInformation} objects corresponding to
+   * the passed in map of names to values. The array will be ordered according
+   * to the order returned by {@link #getResourceTypesArray()}. The value of
+   * each resource type in the returned array will either be the value given 
for
+   * that resource in the {@code res} parameter or, if none is given, 0.
+   *
+   * @param res the map of resource type values
+   * @return an array of {@link ResourceInformation} instances
+   */
+  public static ResourceInformation[] createResourceTypesArray(Map<String,
+      Long> res) {
+    ResourceInformation[] info = new ResourceInformation[resourceTypes.size()];
+
+    for (Entry<String, Integer> entry : RESOURCE_NAME_TO_INDEX.entrySet()) {
+      int index = entry.getValue();
+      Long value = res.get(entry.getKey());
+
+      if (value == null) {
+        value = 0L;
+      }
+
+      info[index] = new ResourceInformation();
+      ResourceInformation.copy(resourceTypesArray[index], info[index]);
+      info[index].setValue(value);
+    }
+
+    return info;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40372be7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 9e54ac6..66d8810 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -24,11 +24,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -60,11 +63,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
       Resources.clone(Resources.none());
 
   // Max allocation
-  private long maxNodeMemory = -1;
-  private int maxNodeVCores = -1;
+  private final long[] maxAllocation;
   private Resource configuredMaxAllocation;
   private boolean forceConfiguredMaxAllocation = true;
   private long configuredMaxAllocationWaitTime;
+  private boolean reportedMaxAllocation = false;
+
+  public ClusterNodeTracker() {
+    maxAllocation = new long[ResourceUtils.getNumberOfKnownResourceTypes()];
+    Arrays.fill(maxAllocation, -1);
+  }
 
   public void addNode(N node) {
     writeLock.lock();
@@ -208,17 +216,18 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
         forceConfiguredMaxAllocation = false;
       }
 
-      if (forceConfiguredMaxAllocation
-          || maxNodeMemory == -1 || maxNodeVCores == -1) {
+      if (forceConfiguredMaxAllocation || !reportedMaxAllocation) {
         return configuredMaxAllocation;
       }
 
       Resource ret = Resources.clone(configuredMaxAllocation);
-      if (ret.getMemorySize() > maxNodeMemory) {
-        ret.setMemorySize(maxNodeMemory);
-      }
-      if (ret.getVirtualCores() > maxNodeVCores) {
-        ret.setVirtualCores(maxNodeVCores);
+
+      for (int i = 0; i < maxAllocation.length; i++) {
+        ResourceInformation info = ret.getResourceInformation(i);
+
+        if (info.getValue() > maxAllocation[i]) {
+          info.setValue(maxAllocation[i]);
+        }
       }
 
       return ret;
@@ -229,31 +238,51 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
 
   private void updateMaxResources(SchedulerNode node, boolean add) {
     Resource totalResource = node.getTotalResource();
+    ResourceInformation[] totalResources;
+
+    if (totalResource != null) {
+      totalResources = totalResource.getResources();
+    } else {
+      LOG.warn(node.getNodeName() + " reported in with null resources, which "
+          + "indicates a problem in the source code. Please file an issue at "
+          + "https://issues.apache.org/jira/secure/CreateIssue!default.jspa";);
+
+      return;
+    }
+
     writeLock.lock();
+
     try {
       if (add) { // added node
-        long nodeMemory = totalResource.getMemorySize();
-        if (nodeMemory > maxNodeMemory) {
-          maxNodeMemory = nodeMemory;
-        }
-        int nodeVCores = totalResource.getVirtualCores();
-        if (nodeVCores > maxNodeVCores) {
-          maxNodeVCores = nodeVCores;
+        // If we add a node, we must have a max allocation for all resource
+        // types
+        reportedMaxAllocation = true;
+
+        for (int i = 0; i < maxAllocation.length; i++) {
+          long value = totalResources[i].getValue();
+
+          if (value > maxAllocation[i]) {
+            maxAllocation[i] = value;
+          }
         }
       } else {  // removed node
-        if (maxNodeMemory == totalResource.getMemorySize()) {
-          maxNodeMemory = -1;
-        }
-        if (maxNodeVCores == totalResource.getVirtualCores()) {
-          maxNodeVCores = -1;
+        boolean recalculate = false;
+
+        for (int i = 0; i < maxAllocation.length; i++) {
+          if (totalResources[i].getValue() == maxAllocation[i]) {
+            // No need to set reportedMaxAllocation to false here because we
+            // will recalculate before we release the lock.
+            maxAllocation[i] = -1;
+            recalculate = true;
+          }
         }
+
         // We only have to iterate through the nodes if the current max memory
         // or vcores was equal to the removed node's
-        if (maxNodeMemory == -1 || maxNodeVCores == -1) {
+        if (recalculate) {
           // Treat it like an empty cluster and add nodes
-          for (N n : nodes.values()) {
-            updateMaxResources(n, true);
-          }
+          reportedMaxAllocation = false;
+          nodes.values().forEach(n -> updateMaxResources(n, true));
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40372be7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 611c7f2..317c648 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -49,6 +49,10 @@ public class MockNodes {
   private static int NODE_ID = 0;
   private static RecordFactory recordFactory = 
RecordFactoryProvider.getRecordFactory(null);
 
+  public static void resetHostIds() {
+    NODE_ID = 0;
+  }
+
   public static List<RMNode> newNodes(int racks, int nodesPerRack,
                                         Resource perNode) {
     List<RMNode> list = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40372be7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
index 7f527f1..c1703bc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
@@ -17,16 +17,21 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.List;
-
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -34,11 +39,15 @@ import static org.junit.Assert.assertEquals;
  * loss of generality.
  */
 public class TestClusterNodeTracker {
-  private ClusterNodeTracker<FSSchedulerNode> nodeTracker =
-      new ClusterNodeTracker<>();
+  private ClusterNodeTracker<FSSchedulerNode> nodeTracker;
 
   @Before
   public void setup() {
+    nodeTracker = new ClusterNodeTracker<>();
+  }
+
+  private void addEight4x4Nodes() {
+    MockNodes.resetHostIds();
     List<RMNode> rmNodes =
         MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
     for (RMNode rmNode : rmNodes) {
@@ -48,6 +57,7 @@ public class TestClusterNodeTracker {
 
   @Test
   public void testGetNodeCount() {
+    addEight4x4Nodes();
     assertEquals("Incorrect number of nodes in the cluster",
         8, nodeTracker.nodeCount());
 
@@ -57,6 +67,7 @@ public class TestClusterNodeTracker {
 
   @Test
   public void testGetNodesForResourceName() throws Exception {
+    addEight4x4Nodes();
     assertEquals("Incorrect number of nodes matching ANY",
         8, nodeTracker.getNodesByResourceName(ResourceRequest.ANY).size());
 
@@ -66,4 +77,110 @@ public class TestClusterNodeTracker {
     assertEquals("Incorrect number of nodes matching node",
         1, nodeTracker.getNodesByResourceName("host0").size());
   }
+
+  @Test
+  public void testMaxAllowedAllocation() {
+    // Add a third resource
+    Configuration conf = new Configuration();
+
+    conf.set(YarnConfiguration.RESOURCE_TYPES, "test1");
+
+    ResourceUtils.resetResourceTypes(conf);
+    setup();
+
+    Resource maximum = Resource.newInstance(10240, 10,
+        Collections.singletonMap("test1", 10L));
+
+    nodeTracker.setConfiguredMaxAllocation(maximum);
+
+    Resource result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("With no nodes added, the ClusterNodeTracker did not return "
+        + "the configured max allocation", maximum, result);
+
+    List<RMNode> smallNodes =
+        MockNodes.newNodes(1, 1, Resource.newInstance(1024, 2,
+            Collections.singletonMap("test1", 4L)));
+    FSSchedulerNode smallNode = new FSSchedulerNode(smallNodes.get(0), false);
+    List<RMNode> mediumNodes =
+        MockNodes.newNodes(1, 1, Resource.newInstance(4096, 2,
+            Collections.singletonMap("test1", 2L)));
+    FSSchedulerNode mediumNode = new FSSchedulerNode(mediumNodes.get(0), 
false);
+    List<RMNode> largeNodes =
+        MockNodes.newNodes(1, 1, Resource.newInstance(16384, 4,
+            Collections.singletonMap("test1", 1L)));
+    FSSchedulerNode largeNode = new FSSchedulerNode(largeNodes.get(0), false);
+
+    nodeTracker.addNode(mediumNode);
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("With a single node added, the ClusterNodeTracker did not "
+        + "return that node's resources as the maximum allocation",
+        mediumNodes.get(0).getTotalCapability(), result);
+
+    nodeTracker.addNode(smallNode);
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("With two nodes added, the ClusterNodeTracker did not "
+        + "return a the maximum allocation that was the max of their aggregate 
"
+        + "resources",
+        Resource.newInstance(4096, 2, Collections.singletonMap("test1", 4L)),
+        result);
+
+    nodeTracker.removeNode(smallNode.getNodeID());
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("After removing a node, the ClusterNodeTracker did not "
+        + "recalculate the adjusted maximum allocation correctly",
+        mediumNodes.get(0).getTotalCapability(), result);
+
+    nodeTracker.addNode(largeNode);
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("With two nodes added, the ClusterNodeTracker did not "
+        + "return a the maximum allocation that was the max of their aggregate 
"
+        + "resources",
+        Resource.newInstance(10240, 4, Collections.singletonMap("test1", 2L)),
+        result);
+
+    nodeTracker.removeNode(largeNode.getNodeID());
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("After removing a node, the ClusterNodeTracker did not "
+        + "recalculate the adjusted maximum allocation correctly",
+        mediumNodes.get(0).getTotalCapability(), result);
+
+    nodeTracker.removeNode(mediumNode.getNodeID());
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("After removing all nodes, the ClusterNodeTracker did not "
+        + "return the configured maximum allocation", maximum, result);
+
+    nodeTracker.addNode(smallNode);
+    nodeTracker.addNode(mediumNode);
+    nodeTracker.addNode(largeNode);
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("With three nodes added, the ClusterNodeTracker did not "
+        + "return a the maximum allocation that was the max of their aggregate 
"
+        + "resources",
+        Resource.newInstance(10240, 4, Collections.singletonMap("test1", 4L)),
+        result);
+
+    nodeTracker.removeNode(smallNode.getNodeID());
+    nodeTracker.removeNode(mediumNode.getNodeID());
+    nodeTracker.removeNode(largeNode.getNodeID());
+
+    result = nodeTracker.getMaxAllowedAllocation();
+
+    assertEquals("After removing all nodes, the ClusterNodeTracker did not "
+        + "return the configured maximum allocation", maximum, result);
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to