YARN-5110. Fix OpportunisticContainerAllocator to insert complete HostAddress in issued ContainerTokenIds. (Konstantinos Karanasos via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/15976306 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/15976306 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/15976306 Branch: refs/heads/YARN-4757 Commit: 1597630681c784a3d59f5605b87e96197b8139d7 Parents: 010e6ac Author: Arun Suresh <asur...@apache.org> Authored: Wed May 18 18:46:00 2016 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Wed May 18 18:46:00 2016 -0700 ---------------------------------------------------------------------- .../api/records/QueuedContainersStatus.java | 2 +- .../queuing/QueuingContainerManagerImpl.java | 1 + .../nodemanager/scheduler/LocalScheduler.java | 6 +- .../OpportunisticContainerAllocator.java | 17 +++--- .../scheduler/TestLocalScheduler.java | 9 ++- .../distributed/NodeQueueLoadMonitor.java | 14 +++-- .../hadoop/yarn/server/MiniYARNCluster.java | 62 +++++++++++++++++++- 7 files changed, 91 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java index a7f0ece..fb567d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java @@ -41,5 +41,5 @@ public abstract class QueuedContainersStatus { public abstract int getWaitQueueLength(); - public abstract void setWaitQueueLength(int queueWaitTime); + public abstract void setWaitQueueLength(int waitQueueLength); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 4f9d5a3..707051f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -200,6 +200,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { .getContainerTokenIdentifier().getContainerID(); this.context.getQueuingContext().getQueuedContainers().remove(containerId); try { + LOG.info("Starting container [" + containerId + "]"); super.startContainerInternal( allocatedContainerInfo.getContainerTokenIdentifier(), allocatedContainerInfo.getStartRequest()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java index 42c1dcd..fca814b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java @@ -337,8 +337,10 @@ public final class LocalScheduler extends AbstractRequestInterceptor { @Override public DistSchedAllocateResponse allocateForDistributedScheduling (AllocateRequest request) throws YarnException, IOException { - LOG.info("Forwarding allocate request to the" + - "Distributed Scheduler Service on YARN RM"); + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the" + + "Distributed Scheduler Service on YARN RM"); + } // Partition requests into GUARANTEED and OPPORTUNISTIC reqs PartitionedResourceRequests partitionedAsks = partitionAskList(request .getAskList()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java index 03ba61d..e33c389 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java @@ -84,14 +84,14 @@ public class OpportunisticContainerAllocator { Map<String, NodeId> allNodes, String userName) throws YarnException { Map<Resource, List<Container>> containers = new HashMap<>(); Set<String> nodesAllocated = new HashSet<>(); - int numAsks = resourceAsks.size(); for (ResourceRequest anyAsk : resourceAsks) { allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId, allNodes, userName, containers, nodesAllocated, anyAsk); - } - if (numAsks > 0) { - LOG.info("Opportunistic allocation requested for: " + numAsks - + " containers; allocated = " + containers.size()); + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.get(anyAsk.getCapability()).size()); } return containers; } @@ -129,8 +129,9 @@ public class OpportunisticContainerAllocator { } cList.add(container); numAllocated++; - LOG.info("Allocated " + numAllocated + " opportunistic containers."); + LOG.info("Allocated [" + container.getId() + "] as opportunistic."); } + LOG.info("Allocated " + numAllocated + " opportunistic containers."); } private Container buildContainer(DistSchedulerParams appParams, @@ -146,8 +147,8 @@ public class OpportunisticContainerAllocator { long currTime = System.currentTimeMillis(); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier( - cId, nodeId.getHost(), userName, capability, - currTime + appParams.containerTokenExpiryInterval, + cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, + capability, currTime + appParams.containerTokenExpiryInterval, context.getContainerTokenSecretManager().getCurrentKey().getKeyId(), nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime, null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java index efc682a..e987e79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.scheduler; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security .NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security .NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -196,9 +198,14 @@ public class TestLocalScheduler { } private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse - allocateResponse) { + allocateResponse) throws Exception { Map<NodeId, List<ContainerId>> allocs = new HashMap<>(); for (Container c : allocateResponse.getAllocatedContainers()) { + ContainerTokenIdentifier cTokId = BuilderUtils + .newContainerTokenIdentifier(c.getContainerToken()); + Assert.assertEquals( + c.getNodeId().getHost() + ":" + c.getNodeId().getPort(), + cTokId.getNmHostAddress()); List<ContainerId> cIds = allocs.get(c.getNodeId()); if (cIds == null) { cIds = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index 21f4f6e..017a256 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -195,11 +195,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { new ClusterNode(rmNode.getNodeID()) .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength)); - LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); } else { - LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); } @@ -210,12 +210,14 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength) .updateTimestamp(); - LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating ClusterNode [" + rmNode.getNodeID() + "] " + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } } else { this.clusterNodes.remove(rmNode.getNodeID()); - LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + currentNode.queueWaitTime + "] and " + "wait queue length [" + currentNode.queueLength + "]"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 2372ea2..de4e22d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -75,6 +75,9 @@ import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; @@ -713,8 +716,14 @@ public class MiniYARNCluster extends CompositeService { ContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler) { - return new CustomContainerManagerImpl(context, exec, del, - nodeStatusUpdater, metrics, dirsHandler); + if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, + YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) { + return new CustomQueueingContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } else { + return new CustomContainerManagerImpl(context, exec, del, + nodeStatusUpdater, metrics, dirsHandler); + } } } @@ -846,6 +855,55 @@ public class MiniYARNCluster extends CompositeService { } } + private class CustomQueueingContainerManagerImpl extends + QueuingContainerManagerImpl { + + public CustomQueueingContainerManagerImpl(Context context, + ContainerExecutor exec, DeletionService del, NodeStatusUpdater + nodeStatusUpdater, NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler) { + super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler); + } + + @Override + protected ContainersMonitor createContainersMonitor(ContainerExecutor + exec) { + return new ContainersMonitorImpl(exec, dispatcher, this.context) { + + @Override + public void increaseContainersAllocation(ProcessTreeInfo pti) { } + + @Override + public void decreaseContainersAllocation(ProcessTreeInfo pti) { } + + @Override + public boolean hasResourcesAvailable( + ContainersMonitorImpl.ProcessTreeInfo pti) { + return true; + } + }; + } + + @Override + protected void createAMRMProxyService(Configuration conf) { + this.amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + + if (this.amrmProxyEnabled) { + LOG.info("CustomAMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + AMRMProxyService amrmProxyService = + useRpc ? new AMRMProxyService(getContext(), dispatcher) + : new ShortCircuitedAMRMProxy(getContext(), dispatcher); + this.setAMRMProxyService(amrmProxyService); + addService(this.getAMRMProxyService()); + } else { + LOG.info("CustomAMRMProxyService is disabled"); + } + } + } + private class ShortCircuitedAMRMProxy extends AMRMProxyService { public ShortCircuitedAMRMProxy(Context context, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org