YARN-6156. AM blacklisting to consider node label partition (Bibin A Chundatt via Varun Saxena)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b7613e0f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b7613e0f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b7613e0f Branch: refs/heads/HDFS-10285 Commit: b7613e0f406fb2b9bd5b1b3c79658e801f63c587 Parents: cd3e59a Author: Varun Saxena <varunsax...@apache.org> Authored: Wed Feb 15 14:48:17 2017 +0530 Committer: Varun Saxena <varunsax...@apache.org> Committed: Wed Feb 15 14:48:17 2017 +0530 ---------------------------------------------------------------------- .../server/resourcemanager/RMServerUtils.java | 22 ++++++ .../nodelabels/RMNodeLabelsManager.java | 16 +++++ .../server/resourcemanager/rmapp/RMAppImpl.java | 12 ++-- .../rmapp/attempt/RMAppAttemptImpl.java | 4 +- .../TestCapacitySchedulerNodeLabelUpdate.java | 73 ++++++++++++++++++++ 5 files changed, 118 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7613e0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 224a1da..e98141b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt .RMAppAttemptState; @@ -561,4 +562,25 @@ public class RMServerUtils { } return newApplicationTimeout; } + + /** + * Get applicable Node count for AM. + * + * @param rmContext context + * @param conf configuration + * @param amreq am resource request + * @return applicable node count + */ + public static int getApplicableNodeCountForAM(RMContext rmContext, + Configuration conf, ResourceRequest amreq) { + if (YarnConfiguration.areNodeLabelsEnabled(conf)) { + RMNodeLabelsManager labelManager = rmContext.getNodeLabelManager(); + String amNodeLabelExpression = amreq.getNodeLabelExpression(); + amNodeLabelExpression = (amNodeLabelExpression == null + || amNodeLabelExpression.trim().isEmpty()) + ? RMNodeLabelsManager.NO_LABEL : amNodeLabelExpression; + return labelManager.getActiveNMCountPerLabel(amNodeLabelExpression); + } + return rmContext.getScheduler().getNumClusterNodes(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7613e0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 5dc8392..effe422 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -350,6 +350,22 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager { } } + /* + * Get active node count based on label. + */ + public int getActiveNMCountPerLabel(String label) { + if (label == null) { + return 0; + } + try { + readLock.lock(); + RMNodeLabel labelInfo = labelCollections.get(label); + return (labelInfo == null) ? 0 : labelInfo.getNumActiveNMs(); + } finally { + readLock.unlock(); + } + } + public Set<String> getLabelsOnNode(NodeId nodeId) { try { readLock.lock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7613e0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 12ece3f..516109b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -18,11 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; -import java.io.IOException; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -46,9 +44,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.ipc.CallerContext; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -987,9 +983,11 @@ public class RMAppImpl implements RMApp, Recoverable { // Transfer over the blacklist from the previous app-attempt. currentAMBlacklistManager = currentAttempt.getAMBlacklistManager(); } else { - if (amBlacklistingEnabled) { + if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) { currentAMBlacklistManager = new SimpleBlacklistManager( - scheduler.getNumClusterNodes(), blacklistDisableThreshold); + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, + getAMResourceRequest()), + blacklistDisableThreshold); } else { currentAMBlacklistManager = new DisabledBlacklistManager(); } @@ -1006,7 +1004,7 @@ public class RMAppImpl implements RMApp, Recoverable { attempts.put(appAttemptId, attempt); currentAttempt = attempt; } - + private void createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) { createNewAttempt(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7613e0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index ab84985..1788722 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1057,7 +1057,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.amReq.setRelaxLocality(true); appAttempt.getAMBlacklistManager().refreshNodeHostCount( - appAttempt.scheduler.getNumClusterNodes()); + RMServerUtils.getApplicableNodeCountForAM(appAttempt.rmContext, + appAttempt.conf, appAttempt.amReq)); ResourceBlacklistRequest amBlacklist = appAttempt.getAMBlacklistManager().getBlacklistUpdates(); @@ -1246,7 +1247,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { } } - private void rememberTargetTransitions(RMAppAttemptEvent event, Object transitionToDo, RMAppAttemptState targetFinalState) { transitionTodo = transitionToDo; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7613e0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 732b5d1..b4ebd15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -721,6 +723,77 @@ public class TestCapacitySchedulerNodeLabelUpdate { rm.close(); } + @Test(timeout = 30000) + public void testBlacklistAMDisableLabel() throws Exception { + conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, + true); + conf.setFloat( + YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD, + 0.5f); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("x"), + NodeId.newInstance("h3", 0), toSet("x"), NodeId.newInstance("h6", 0), + toSet("x"))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h4", 0), toSet("y"), + NodeId.newInstance("h5", 0), toSet("y"), NodeId.newInstance("h7", 0), + toSet("y"))); + + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + // Nodes in label default h1,h8,h9 + // Nodes in label x h2,h3,h6 + // Nodes in label y h4,h5,h7 + MockNM nm1 = rm.registerNode("h1:1234", 2048); + MockNM nm2 = rm.registerNode("h2:1234", 2048); + rm.registerNode("h3:1234", 2048); + rm.registerNode("h4:1234", 2048); + rm.registerNode("h5:1234", 2048); + rm.registerNode("h6:1234", 2048); + rm.registerNode("h7:1234", 2048); + rm.registerNode("h8:1234", 2048); + rm.registerNode("h9:1234", 2048); + + // Submit app with AM container launched on default partition i.e. h1. + RMApp app = rm.submitApp(GB, "app", "user", null, "a"); + MockRM.launchAndRegisterAM(app, rm, nm1); + RMAppAttempt appAttempt = app.getCurrentAppAttempt(); + // Add default node blacklist from default + appAttempt.getAMBlacklistManager().addNode("h1"); + ResourceBlacklistRequest blacklistUpdates = + appAttempt.getAMBlacklistManager().getBlacklistUpdates(); + Assert.assertEquals(1, blacklistUpdates.getBlacklistAdditions().size()); + Assert.assertEquals(0, blacklistUpdates.getBlacklistRemovals().size()); + // Adding second node from default parition + appAttempt.getAMBlacklistManager().addNode("h8"); + blacklistUpdates = appAttempt.getAMBlacklistManager().getBlacklistUpdates(); + Assert.assertEquals(0, blacklistUpdates.getBlacklistAdditions().size()); + Assert.assertEquals(2, blacklistUpdates.getBlacklistRemovals().size()); + + // Submission in label x + RMApp applabel = rm.submitApp(GB, "app", "user", null, "a", "x"); + MockRM.launchAndRegisterAM(applabel, rm, nm2); + RMAppAttempt appAttemptlabelx = applabel.getCurrentAppAttempt(); + appAttemptlabelx.getAMBlacklistManager().addNode("h2"); + ResourceBlacklistRequest blacklistUpdatesOnx = + appAttemptlabelx.getAMBlacklistManager().getBlacklistUpdates(); + Assert.assertEquals(1, blacklistUpdatesOnx.getBlacklistAdditions().size()); + Assert.assertEquals(0, blacklistUpdatesOnx.getBlacklistRemovals().size()); + // Adding second node from default parition + appAttemptlabelx.getAMBlacklistManager().addNode("h3"); + blacklistUpdatesOnx = + appAttempt.getAMBlacklistManager().getBlacklistUpdates(); + Assert.assertEquals(0, blacklistUpdatesOnx.getBlacklistAdditions().size()); + Assert.assertEquals(2, blacklistUpdatesOnx.getBlacklistRemovals().size()); + + rm.close(); + } + private void checkAMResourceLimit(MockRM rm, String queuename, int memory, String label) throws InterruptedException { Assert.assertEquals(memory, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org