This is an automated email from the ASF dual-hosted git repository.

templedf pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 538bb48  YARN-9323. FSLeafQueue#computeMaxAMResource does not override 
zero values for custom resources (Contributed by Szilard Nemeth via Daniel 
Templeton)
538bb48 is described below

commit 538bb4880da43714af68143e9a1dde195bd77099
Author: Szilard Nemeth <snem...@cloudera.com>
AuthorDate: Wed Feb 27 19:49:34 2019 -0800

    YARN-9323. FSLeafQueue#computeMaxAMResource does not override zero values 
for custom resources
    (Contributed by Szilard Nemeth via Daniel Templeton)
    
    Change-Id: Id844ccf09488f367c0c7de0a3b2d4aca1bba31cc
---
 .../resourcemanager/scheduler/QueueMetrics.java    |   4 +
 .../scheduler/QueueMetricsForCustomResources.java  |   4 +
 .../scheduler/fair/FSLeafQueue.java                |  26 ++++
 .../scheduler/fair/FSQueueMetrics.java             |   4 +
 .../scheduler/fair/TestFSLeafQueue.java            | 134 +++++++++++++++++++++
 5 files changed, 172 insertions(+)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index 0a01c60..d126f09 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -832,4 +832,8 @@ public class QueueMetrics implements MetricsSource {
   public long getAggregatePreemptedContainers() {
     return aggregateContainersPreempted.value();
   }
+
+  public QueueMetricsForCustomResources getQueueMetricsForCustomResources() {
+    return queueMetricsForCustomResources;
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java
index e8c8897..3470858 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java
@@ -101,4 +101,8 @@ public class QueueMetricsForCustomResources {
   QueueMetricsCustomResource getAggregatePreemptedSeconds() {
     return aggregatePreemptedSeconds;
   }
+
+  public QueueMetricsCustomResource getAvailable() {
+    return available;
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 3deddee..044254d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -42,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsForCustomResources;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -517,6 +520,29 @@ public class FSLeafQueue extends FSQueue {
           getMaxShare().getVirtualCores()));
     }
 
+    QueueMetricsForCustomResources metricsForCustomResources =
+        scheduler.getRootQueueMetrics().getQueueMetricsForCustomResources();
+
+    if (metricsForCustomResources != null) {
+      QueueMetricsCustomResource availableResources =
+          metricsForCustomResources.getAvailable();
+
+      // We expect all custom resources contained in availableResources,
+      // so we will loop through all of them.
+      for (Map.Entry<String, Long> availableEntry : availableResources
+          .getValues().entrySet()) {
+        String resourceName = availableEntry.getKey();
+
+        // We only update the value if fairshare is 0 for that resource.
+        if (maxResource.getResourceValue(resourceName) == 0) {
+          Long availableValue = availableEntry.getValue();
+          long value = Math.min(availableValue,
+              getMaxShare().getResourceValue(resourceName));
+          maxResource.setResourceValue(resourceName, value);
+        }
+      }
+    }
+
     // Round up to allow AM to run when there is only one vcore on the cluster
     return Resources.multiplyAndRoundUp(maxResource, maxAMShare);
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java
index cfea492..d0ddd42 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java
@@ -310,4 +310,8 @@ public class FSQueueMetrics extends QueueMetrics {
 
     return (FSQueueMetrics)metrics;
   }
+
+  FSQueueMetricsForCustomResources getCustomResources() {
+    return customResources;
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 4a738ca..0cf1a7b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -42,19 +43,26 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import java.util.Map;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 
 public class TestFSLeafQueue extends FairSchedulerTestBase {
   private final static String ALLOC_FILE = new File(TEST_DIR,
       TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath();
   private Resource maxResource = Resources.createResource(1024 * 8);
+  private static final float MAX_AM_SHARE = 0.5f;
+  private static final String CUSTOM_RESOURCE = "test1";
 
   @Before
   public void setup() throws IOException {
@@ -105,6 +113,8 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
+    out.println("<queueMaxAMShareDefault>" + MAX_AM_SHARE +
+        "</queueMaxAMShareDefault>");
     out.println("<queue name=\"queueA\"></queue>");
     out.println("<queue name=\"queueB\"></queue>");
     out.println("</allocations>");
@@ -221,4 +231,128 @@ public class TestFSLeafQueue extends 
FairSchedulerTestBase {
     assertTrue("Test failed with exception(s)" + exceptions,
         exceptions.isEmpty());
   }
+
+  @Test
+  public void testCanRunAppAMReturnsTrue() {
+    conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
+    ResourceUtils.resetResourceTypes(conf);
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+
+    Resource maxShare = Resource.newInstance(1024 * 8, 4,
+        ImmutableMap.of(CUSTOM_RESOURCE, 10L));
+
+    // Add a node to increase available memory and vcores in scheduler's
+    // root queue metrics
+    addNodeToScheduler(Resource.newInstance(4096, 10,
+        ImmutableMap.of(CUSTOM_RESOURCE, 25L)));
+
+    FSLeafQueue queue = setupQueue(maxShare);
+
+    //Min(availableMemory, maxShareMemory (maxResourceOverridden))
+    // --> Min(4096, 8192) = 4096
+    //Min(availableVCores, maxShareVCores (maxResourceOverridden))
+    // --> Min(10, 4) = 4
+    //Min(available test1, maxShare test1 (maxResourceOverridden))
+    // --> Min(25, 10) = 10
+    //MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE
+    // --> 2048 MB memory, 2 vcores, 5 test1
+    Resource expectedAMShare = Resource.newInstance(2048, 2,
+        ImmutableMap.of(CUSTOM_RESOURCE, 5L));
+
+    Resource appAMResource = Resource.newInstance(2048, 2,
+        ImmutableMap.of(CUSTOM_RESOURCE, 3L));
+
+    Map<String, Long> customResourceValues =
+        verifyQueueMetricsForCustomResources(queue);
+
+    boolean result = queue.canRunAppAM(appAMResource);
+    assertTrue("AM should have been allocated!", result);
+
+    verifyAMShare(queue, expectedAMShare, customResourceValues);
+  }
+
+  private FSLeafQueue setupQueue(Resource maxShare) {
+    String queueName = "root.queue1";
+    FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
+    schedulable.setMaxShare(new ConfigurableResource(maxShare));
+    schedulable.setMaxAMShare(MAX_AM_SHARE);
+    return schedulable;
+  }
+
+  @Test
+  public void testCanRunAppAMReturnsFalse() {
+    conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
+    ResourceUtils.resetResourceTypes(conf);
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+
+    Resource maxShare = Resource.newInstance(1024 * 8, 4,
+        ImmutableMap.of(CUSTOM_RESOURCE, 10L));
+
+    // Add a node to increase available memory and vcores in scheduler's
+    // root queue metrics
+    addNodeToScheduler(Resource.newInstance(4096, 10,
+        ImmutableMap.of(CUSTOM_RESOURCE, 25L)));
+
+    FSLeafQueue queue = setupQueue(maxShare);
+
+    //Min(availableMemory, maxShareMemory (maxResourceOverridden))
+    // --> Min(4096, 8192) = 4096
+    //Min(availableVCores, maxShareVCores (maxResourceOverridden))
+    // --> Min(10, 4) = 4
+    //Min(available test1, maxShare test1 (maxResourceOverridden))
+    // --> Min(25, 10) = 10
+    //MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE
+    // --> 2048 MB memory, 2 vcores, 5 test1
+    Resource expectedAMShare = Resource.newInstance(2048, 2,
+        ImmutableMap.of(CUSTOM_RESOURCE, 5L));
+
+    Resource appAMResource = Resource.newInstance(2048, 2,
+        ImmutableMap.of(CUSTOM_RESOURCE, 6L));
+
+    Map<String, Long> customResourceValues =
+        verifyQueueMetricsForCustomResources(queue);
+
+    boolean result = queue.canRunAppAM(appAMResource);
+    assertFalse("AM should not have been allocated!", result);
+
+    verifyAMShare(queue, expectedAMShare, customResourceValues);
+  }
+
+  private void addNodeToScheduler(Resource node1Resource) {
+    RMNode node1 = MockNodes.newNodeInfo(0, node1Resource, 1, "127.0.0.2");
+    scheduler.handle(new NodeAddedSchedulerEvent(node1));
+  }
+
+  private void verifyAMShare(FSLeafQueue schedulable,
+      Resource expectedAMShare, Map<String, Long> customResourceValues) {
+    Resource actualAMShare = Resource.newInstance(
+        schedulable.getMetrics().getMaxAMShareMB(),
+        schedulable.getMetrics().getMaxAMShareVCores(), customResourceValues);
+    long customResourceValue =
+        actualAMShare.getResourceValue(CUSTOM_RESOURCE);
+
+    //make sure to verify custom resource value explicitly!
+    assertEquals(5L, customResourceValue);
+    assertEquals("AM share is not the expected!", expectedAMShare,
+        actualAMShare);
+  }
+
+  private Map<String, Long> verifyQueueMetricsForCustomResources(
+      FSLeafQueue schedulable) {
+    QueueMetricsCustomResource maxAMShareCustomResources =
+        schedulable.getMetrics().getCustomResources().getMaxAMShare();
+    Map<String, Long> customResourceValues = maxAMShareCustomResources
+        .getValues();
+    assertNotNull("Queue metrics for custom resources should not be null!",
+        maxAMShareCustomResources);
+    assertNotNull("Queue metrics for custom resources resource values " +
+        "should not be null!", customResourceValues);
+    return customResourceValues;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to