http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java new file mode 100644 index 0000000..eb9ec4b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java @@ -0,0 +1,437 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.api.protocolrecords + .FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords + .FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords + .RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords + .RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy + .AMRMProxyApplicationContext; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; + + + +import org.apache.hadoop.yarn.server.nodemanager.security + .NMTokenSecretManagerInNM; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * <p>The LocalScheduler runs on the NodeManager and is modelled as an + * <code>AMRMProxy</code> request interceptor. It is responsible for the + * following :</p> + * <ul> + * <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the + * response objects to extract instructions from the + * <code>ClusterManager</code> running on the ResourceManager to aid in making + * Scheduling scheduling decisions</li> + * <li>Call the <code>OpportunisticContainerAllocator</code> to allocate + * containers for the opportunistic resource outstandingOpReqs</li> + * </ul> + */ +public final class LocalScheduler extends AbstractRequestInterceptor { + + static class PartitionedResourceRequests { + private List<ResourceRequest> guaranteed = new ArrayList<>(); + private List<ResourceRequest> opportunistic = new ArrayList<>(); + public List<ResourceRequest> getGuaranteed() { + return guaranteed; + } + public List<ResourceRequest> getOpportunistic() { + return opportunistic; + } + } + + static class DistSchedulerParams { + Resource maxResource; + Resource minResource; + Resource incrementResource; + int containerTokenExpiryInterval; + } + + private static final Logger LOG = LoggerFactory + .getLogger(LocalScheduler.class); + + // Currently just used to keep track of allocated Containers + // Can be used for reporting stats later + private Set<ContainerId> containersAllocated = new HashSet<>(); + + private DistSchedulerParams appParams = new DistSchedulerParams(); + private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter = + new OpportunisticContainerAllocator.ContainerIdCounter(); + private Map<String, NodeId> nodeList = new HashMap<>(); + + // Mapping of NodeId to NodeTokens. Populated either from RM response or + // generated locally if required. + private Map<NodeId, NMToken> nodeTokens = new HashMap<>(); + final Set<String> blacklist = new HashSet<>(); + + // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, + // Resource Name (Host/rack/any) and capability. This mapping is required + // to match a received Container to an outstanding OPPORTUNISTIC + // ResourceRequests (ask) + final TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>> + outstandingOpReqs = new TreeMap<>(); + + private ApplicationAttemptId applicationAttemptId; + private OpportunisticContainerAllocator containerAllocator; + private NMTokenSecretManagerInNM nmSecretManager; + private String appSubmitter; + + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + initLocal(appContext.getApplicationAttemptId(), + appContext.getNMCotext().getContainerAllocator(), + appContext.getNMCotext().getNMTokenSecretManager(), + appContext.getUser()); + } + + @VisibleForTesting + void initLocal(ApplicationAttemptId applicationAttemptId, + OpportunisticContainerAllocator containerAllocator, + NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) { + this.applicationAttemptId = applicationAttemptId; + this.containerAllocator = containerAllocator; + this.nmSecretManager = nmSecretManager; + this.appSubmitter = appSubmitter; + } + + /** + * Route register call to the corresponding distributed scheduling method viz. + * registerApplicationMasterForDistributedScheduling, and return response to + * the caller after stripping away Distributed Scheduling information. + * + * @param request + * registration request + * @return Allocate Response + * @throws YarnException + * @throws IOException + */ + @Override + public RegisterApplicationMasterResponse registerApplicationMaster + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return registerApplicationMasterForDistributedScheduling(request) + .getRegisterResponse(); + } + + /** + * Route allocate call to the allocateForDistributedScheduling method and + * return response to the caller after stripping away Distributed Scheduling + * information. + * + * @param request + * allocation request + * @return Allocate Response + * @throws YarnException + * @throws IOException + */ + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + return allocateForDistributedScheduling(request).getAllocateResponse(); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster + (FinishApplicationMasterRequest request) throws YarnException, + IOException { + return getNextInterceptor().finishApplicationMaster(request); + } + + /** + * Check if we already have a NMToken. if Not, generate the Token and + * add it to the response + * @param response + * @param nmTokens + * @param allocatedContainers + */ + private void updateResponseWithNMTokens(AllocateResponse response, + List<NMToken> nmTokens, List<Container> allocatedContainers) { + List<NMToken> newTokens = new ArrayList<>(); + if (allocatedContainers.size() > 0) { + response.getAllocatedContainers().addAll(allocatedContainers); + for (Container alloc : allocatedContainers) { + if (!nodeTokens.containsKey(alloc.getNodeId())) { + newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc)); + } + } + List<NMToken> retTokens = new ArrayList<>(nmTokens); + retTokens.addAll(newTokens); + response.setNMTokens(retTokens); + } + } + + private PartitionedResourceRequests partitionAskList(List<ResourceRequest> + askList) { + PartitionedResourceRequests partitionedRequests = + new PartitionedResourceRequests(); + for (ResourceRequest rr : askList) { + if (rr.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + partitionedRequests.getOpportunistic().add(rr); + } else { + partitionedRequests.getGuaranteed().add(rr); + } + } + return partitionedRequests; + } + + private void updateParameters( + DistSchedRegisterResponse registerResponse) { + appParams.minResource = registerResponse.getMinAllocatableCapabilty(); + appParams.maxResource = registerResponse.getMaxAllocatableCapabilty(); + appParams.incrementResource = + registerResponse.getIncrAllocatableCapabilty(); + if (appParams.incrementResource == null) { + appParams.incrementResource = appParams.minResource; + } + appParams.containerTokenExpiryInterval = registerResponse + .getContainerTokenExpiryInterval(); + + containerIdCounter + .resetContainerIdCounter(registerResponse.getContainerIdStart()); + setNodeList(registerResponse.getNodesForScheduling()); + } + + /** + * Takes a list of ResourceRequests (asks), extracts the key information viz. + * (Priority, ResourceName, Capability) and adds it the outstanding + * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce + * the current YARN constraint that only a single ResourceRequest can exist at + * a give Priority and Capability + * @param resourceAsks + */ + public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) { + for (ResourceRequest request : resourceAsks) { + // Handling locality for opportunistic tokens is optional; we rely on + // "anyAsk" to drive allocations + Priority priority = request.getPriority(); + String resourceName = request.getResourceName(); + + if (!ResourceRequest.isAnyLocation(request.getResourceName())) { + continue; + } + + if (request.getNumContainers() == 0) { + continue; + } + + Map<String, Map<Resource, ResourceRequest>> asks = + this.outstandingOpReqs.get(priority); + if (asks == null) { + asks = new HashMap<>(); + this.outstandingOpReqs.put(priority, asks); + } + + Map<Resource, ResourceRequest> reqMap = asks.get(resourceName); + if (reqMap == null) { + reqMap = new HashMap<>(); + asks.put(resourceName, reqMap); + } + + ResourceRequest resourceRequest = reqMap.get(request.getCapability()); + if (resourceRequest == null) { + resourceRequest = request; + reqMap.put(request.getCapability(), request); + } else { + resourceRequest.setNumContainers( + resourceRequest.getNumContainers() + request.getNumContainers()); + } + if (ResourceRequest.isAnyLocation(request.getResourceName())) { + LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority + + ", with capability = " + request.getCapability() + " ) : " + + resourceRequest.getNumContainers()); + } + } + } + + /** + * This method matches a returned list of Container Allocations to any + * outstanding OPPORTUNISTIC ResourceRequest + * @param capability + * @param allocatedContainers + */ + public void matchAllocationToOutstandingRequest(Resource capability, + List<Container> allocatedContainers) { + for (Container c : allocatedContainers) { + containersAllocated.add(c.getId()); + Map<String, Map<Resource, ResourceRequest>> asks = this + .outstandingOpReqs.get(c.getPriority()); + + if (asks == null) + continue; + + // Host specific accounting + removeFromReqMap(capability, asks.get(c.getNodeId().getHost())); + + // any ask accounting + removeFromReqMap(capability, asks.get(ResourceRequest.ANY)); + } + } + + private void removeFromReqMap(Resource capability, Map<Resource, + ResourceRequest> rrMap) { + if (rrMap != null) { + ResourceRequest rr = rrMap.get(capability); + if (rr != null) { + rr.setNumContainers(rr.getNumContainers() - 1); + if (rr.getNumContainers() == 0) + rrMap.remove(capability); + } + } + } + + private void setNodeList(List<NodeId> nodeList) { + this.nodeList.clear(); + addToNodeList(nodeList); + } + + private void addToNodeList(List<NodeId> nodes) { + for (NodeId n : nodes) { + this.nodeList.put(n.getHost(), n); + } + } + + @Override + public DistSchedRegisterResponse + registerApplicationMasterForDistributedScheduling + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + LOG.info("Forwarding registration request to the" + + "Distributed Scheduler Service on YARN RM"); + DistSchedRegisterResponse dsResp = getNextInterceptor() + .registerApplicationMasterForDistributedScheduling(request); + updateParameters(dsResp); + return dsResp; + } + + @Override + public DistSchedAllocateResponse allocateForDistributedScheduling + (AllocateRequest request) throws YarnException, IOException { + LOG.info("Forwarding allocate request to the" + + "Distributed Scheduler Service on YARN RM"); + // Partition requests into GUARANTEED and OPPORTUNISTIC reqs + PartitionedResourceRequests partitionedAsks = partitionAskList(request + .getAskList()); + + List<ContainerId> releasedContainers = request.getReleaseList(); + int numReleasedContainers = releasedContainers.size(); + if (numReleasedContainers > 0) { + LOG.info("AttemptID: " + applicationAttemptId + " released: " + + numReleasedContainers); + containersAllocated.removeAll(releasedContainers); + } + + // Also, update black list + ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); + if (rbr != null) { + blacklist.removeAll(rbr.getBlacklistRemovals()); + blacklist.addAll(rbr.getBlacklistAdditions()); + } + + // Add OPPORTUNISTIC reqs to the outstanding reqs + addToOutstandingReqs(partitionedAsks.getOpportunistic()); + + List<Container> allocatedContainers = new ArrayList<>(); + for (Priority priority : outstandingOpReqs.descendingKeySet()) { + for (Map<Resource, ResourceRequest> reqMap : + outstandingOpReqs.get(priority).values()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given Cap (The actual container size + // might be different than what is requested.. which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + Map<Resource, List<Container>> allocated = + containerAllocator.allocate(this.appParams, containerIdCounter, + reqMap.values(), blacklist, applicationAttemptId, nodeList, + appSubmitter); + for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) { + matchAllocationToOutstandingRequest(e.getKey(), e.getValue()); + allocatedContainers.addAll(e.getValue()); + } + } + } + + // Send all the GUARANTEED Reqs to RM + request.setAskList(partitionedAsks.getGuaranteed()); + DistSchedAllocateResponse dsResp = + getNextInterceptor().allocateForDistributedScheduling(request); + + // Update host to nodeId mapping + setNodeList(dsResp.getNodesForScheduling()); + List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens(); + for (NMToken nmToken : nmTokens) { + nodeTokens.put(nmToken.getNodeId(), nmToken); + } + + List<ContainerStatus> completedContainers = + dsResp.getAllocateResponse().getCompletedContainersStatuses(); + + // Only account for opportunistic containers + for (ContainerStatus cs : completedContainers) { + if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + containersAllocated.remove(cs.getContainerId()); + } + } + + // Check if we have NM tokens for all the allocated containers. If not + // generate one and update the response. + updateResponseWithNMTokens( + dsResp.getAllocateResponse(), nmTokens, allocatedContainers); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Number of opportunistic containers currently allocated by" + + "application: " + containersAllocated.size()); + } + return dsResp; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java new file mode 100644 index 0000000..7b2a258 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * <p>The OpportunisticContainerAllocator allocates containers on a given list + * of Nodes after it modifies the container sizes to within allowable limits + * specified by the <code>ClusterManager</code> running on the RM. It tries to + * distribute the containers as evenly as possible. It also uses the + * <code>NMTokenSecretManagerInNM</code> to generate the required NM tokens for + * the allocated containers</p> + */ +public class OpportunisticContainerAllocator { + + private static final Log LOG = + LogFactory.getLog(OpportunisticContainerAllocator.class); + + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DominantResourceCalculator(); + + static class ContainerIdCounter { + final AtomicLong containerIdCounter = new AtomicLong(1); + + void resetContainerIdCounter(long containerIdStart) { + this.containerIdCounter.set(containerIdStart); + } + + long generateContainerId() { + return this.containerIdCounter.decrementAndGet(); + } + } + + private final NodeStatusUpdater nodeStatusUpdater; + private final Context context; + private int webpagePort; + + public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater, + Context context, int webpagePort) { + this.nodeStatusUpdater = nodeStatusUpdater; + this.context = context; + this.webpagePort = webpagePort; + } + + public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams, + ContainerIdCounter idCounter, Collection<ResourceRequest> resourceAsks, + Set<String> blacklist, ApplicationAttemptId appAttId, + Map<String, NodeId> allNodes, String userName) throws YarnException { + Map<Resource, List<Container>> containers = new HashMap<>(); + Set<String> nodesAllocated = new HashSet<>(); + List<ResourceRequest> anyAsks = new ArrayList<>(resourceAsks); + for (ResourceRequest anyAsk : anyAsks) { + allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId, + allNodes, userName, containers, nodesAllocated, anyAsk); + } + if (resourceAsks.size() > 0) { + LOG.info("Opportunistic allocation requested for: " + resourceAsks.size() + + " containers; allocated = " + containers.size()); + } + return containers; + } + + private void allocateOpportunisticContainers(DistSchedulerParams appParams, + ContainerIdCounter idCounter, Set<String> blacklist, + ApplicationAttemptId id, Map<String, NodeId> allNodes, String userName, + Map<Resource, List<Container>> containers, Set<String> nodesAllocated, + ResourceRequest anyAsk) throws YarnException { + int toAllocate = anyAsk.getNumContainers() + - (containers.isEmpty() ? + 0 : containers.get(anyAsk.getCapability()).size()); + + List<String> topKNodesLeft = new ArrayList<>(); + for (String s : allNodes.keySet()) { + // Bias away from whatever we have already allocated and respect blacklist + if (nodesAllocated.contains(s) || blacklist.contains(s)) { + continue; + } + topKNodesLeft.add(s); + } + int numAllocated = 0; + int nextNodeToAllocate = 0; + for (int numCont = 0; numCont < toAllocate; numCont++) { + String topNode = topKNodesLeft.get(nextNodeToAllocate); + nextNodeToAllocate++; + nextNodeToAllocate %= topKNodesLeft.size(); + NodeId nodeId = allNodes.get(topNode); + Container container = buildContainer(appParams, idCounter, anyAsk, id, + userName, nodeId); + List<Container> cList = containers.get(anyAsk.getCapability()); + if (cList == null) { + cList = new ArrayList<>(); + containers.put(anyAsk.getCapability(), cList); + } + cList.add(container); + numAllocated++; + LOG.info("Allocated " + numAllocated + " opportunistic containers."); + } + } + + private Container buildContainer(DistSchedulerParams appParams, + ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id, + String userName, NodeId nodeId) throws YarnException { + ContainerId cId = + ContainerId.newContainerId(id, idCounter.generateContainerId()); + + // Normalize the resource asks (Similar to what the the RM scheduler does + // before accepting an ask) + Resource capability = normalizeCapability(appParams, rr); + + long currTime = System.currentTimeMillis(); + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier( + cId, nodeId.getHost(), userName, capability, + currTime + appParams.containerTokenExpiryInterval, + context.getContainerTokenSecretManager().getCurrentKey().getKeyId(), + nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime, + null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, + ExecutionType.OPPORTUNISTIC); + byte[] pwd = + context.getContainerTokenSecretManager().createPassword( + containerTokenIdentifier); + Token containerToken = newContainerToken(nodeId, pwd, + containerTokenIdentifier); + Container container = BuilderUtils.newContainer( + cId, nodeId, nodeId.getHost() + ":" + webpagePort, + capability, rr.getPriority(), containerToken); + return container; + } + + private Resource normalizeCapability(DistSchedulerParams appParams, + ResourceRequest ask) { + return Resources.normalize(RESOURCE_CALCULATOR, + ask.getCapability(), appParams.minResource, appParams.maxResource, + appParams.incrementResource); + } + + public static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), + nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java index f6169e7..86cce35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java @@ -29,7 +29,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; @@ -50,7 +53,7 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager { private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys; private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap; private final NMStateStoreService stateStore; - private NodeId nodeId; + private NodeId nodeId; public NMTokenSecretManagerInNM() { this(new NMNullStateStoreService()); @@ -276,4 +279,23 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager { LOG.error("Unable to remove master key for application " + attempt, e); } } + + /** + * Used by the Distributed Scheduler framework to generate NMTokens + * @param applicationSubmitter + * @param container + * @return NMToken + */ + public NMToken generateNMToken( + String applicationSubmitter, Container container) { + this.readLock.lock(); + try { + Token token = + createNMToken(container.getId().getApplicationAttemptId(), + container.getNodeId(), applicationSubmitter); + return NMToken.newInstance(container.getNodeId(), token); + } finally { + this.readLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index ddbfbb9..f126080 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -80,7 +80,7 @@ public class TestEventFlow { Context context = new NMContext(new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(), null, null, - new NMNullStateStoreService()) { + new NMNullStateStoreService(), false) { @Override public int getHttpPort() { return 1234; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index fa8d131..a5b004e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -1584,7 +1584,7 @@ public class TestNodeStatusUpdater { protected NMContext createNMContext( NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, - NMStateStoreService store) { + NMStateStoreService store, boolean isDistributedSchedulingEnabled) { return new MyNMContext(containerTokenSecretManager, nmTokenSecretManager); } @@ -1819,7 +1819,7 @@ public class TestNodeStatusUpdater { NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager) { super(containerTokenSecretManager, nmTokenSecretManager, null, null, - new NMNullStateStoreService()); + new NMNullStateStoreService(), false); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 1aea9d2..9a25a29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -683,5 +684,14 @@ public abstract class BaseAMRMProxyTest { public NodeStatusUpdater getNodeStatusUpdater() { return null; } + + public boolean isDistributedSchedulingEnabled() { + return false; + } + + @Override + public OpportunisticContainerAllocator getContainerAllocator() { + return null; + } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 787778e..6f09154 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -110,7 +110,7 @@ public abstract class BaseContainerManagerTest { protected Configuration conf = new YarnConfiguration(); protected Context context = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService()) { + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) { public int getHttpPort() { return HTTP_PORT; }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 2e014de..dfb7a1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -471,7 +471,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { NMStateStoreService stateStore) { NMContext context = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore){ + new ApplicationACLsManager(conf), stateStore, false){ public int getHttpPort() { return HTTP_PORT; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java index 1169c68..cf7ca8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java @@ -113,7 +113,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest { private static final String INVALID_JAVA_HOME = "/no/jvm/here"; protected Context distContext = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService()) { + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) { public int getHttpPort() { return HTTP_PORT; }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java index 9e08b7f..c768df1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java @@ -81,7 +81,8 @@ public class TestLocalCacheDirectoryManager { NMContext nmContext = new NMContext(new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService()); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), + false); ResourceLocalizationService service = new ResourceLocalizationService(null, null, null, null, nmContext); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 596f784..5fc71c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -186,7 +186,7 @@ public class TestResourceLocalizationService { conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); nmContext = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService()); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); } @After @@ -2365,7 +2365,7 @@ public class TestResourceLocalizationService { NMContext nmContext = new NMContext(new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore); + new ApplicationACLsManager(conf), stateStore, false); ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java new file mode 100644 index 0000000..efc682a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.api.protocolrecords + .RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords + .RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; +import org.apache.hadoop.yarn.server.nodemanager.security + .NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security + .NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestLocalScheduler { + + @Test + public void testLocalScheduler() throws Exception { + + Configuration conf = new Configuration(); + LocalScheduler localScheduler = new LocalScheduler(); + + NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); + Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); + Context context = Mockito.mock(Context.class); + NMContainerTokenSecretManager nmContainerTokenSecretManager = new + NMContainerTokenSecretManager(conf); + MasterKey mKey = new MasterKey() { + @Override + public int getKeyId() { + return 1; + } + @Override + public void setKeyId(int keyId) {} + @Override + public ByteBuffer getBytes() { + return ByteBuffer.allocate(8); + } + @Override + public void setBytes(ByteBuffer bytes) {} + }; + nmContainerTokenSecretManager.setMasterKey(mKey); + Mockito.when(context.getContainerTokenSecretManager()).thenReturn + (nmContainerTokenSecretManager); + OpportunisticContainerAllocator containerAllocator = + new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777); + + NMTokenSecretManagerInNM nmTokenSecretManagerInNM = + new NMTokenSecretManagerInNM(); + nmTokenSecretManagerInNM.setMasterKey(mKey); + localScheduler.initLocal( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), + containerAllocator, nmTokenSecretManagerInNM, "test"); + + RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class); + localScheduler.setNextInterceptor(finalReqIntcptr); + + DistSchedRegisterResponse distSchedRegisterResponse = + Records.newRecord(DistSchedRegisterResponse.class); + distSchedRegisterResponse.setRegisterResponse( + Records.newRecord(RegisterApplicationMasterResponse.class)); + distSchedRegisterResponse.setContainerTokenExpiryInterval(12345); + distSchedRegisterResponse.setContainerIdStart(0); + distSchedRegisterResponse.setMaxAllocatableCapabilty( + Resource.newInstance(1024, 4)); + distSchedRegisterResponse.setMinAllocatableCapabilty( + Resource.newInstance(512, 2)); + distSchedRegisterResponse.setNodesForScheduling(Arrays.asList( + NodeId.newInstance("a", 1), NodeId.newInstance("b", 2))); + Mockito.when( + finalReqIntcptr.registerApplicationMasterForDistributedScheduling( + Mockito.any(RegisterApplicationMasterRequest.class))) + .thenReturn(distSchedRegisterResponse); + + localScheduler.registerApplicationMaster( + Records.newRecord(RegisterApplicationMasterRequest.class)); + + Mockito.when( + finalReqIntcptr.allocateForDistributedScheduling( + Mockito.any(AllocateRequest.class))) + .thenAnswer(new Answer<DistSchedAllocateResponse>() { + @Override + public DistSchedAllocateResponse answer(InvocationOnMock + invocationOnMock) throws Throwable { + return createAllocateResponse(Arrays.asList( + NodeId.newInstance("c", 3), NodeId.newInstance("d", 4))); + } + }); + + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class); + guaranteedReq.setExecutionType(ExecutionType.GUARANTEED); + guaranteedReq.setNumContainers(5); + guaranteedReq.setCapability(Resource.newInstance(2048, 2)); + guaranteedReq.setRelaxLocality(true); + guaranteedReq.setResourceName("*"); + ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class); + opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC); + opportunisticReq.setNumContainers(4); + opportunisticReq.setCapability(Resource.newInstance(1024, 4)); + opportunisticReq.setPriority(Priority.newInstance(100)); + opportunisticReq.setRelaxLocality(true); + opportunisticReq.setResourceName("*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + + // Verify 4 containers were allocated + AllocateResponse allocateResponse = localScheduler.allocate(allocateRequest); + Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size()); + + // Verify equal distribution on hosts a and b + // And None on c and d + Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse); + Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size()); + Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size()); + Assert.assertNull(allocs.get(NodeId.newInstance("c", 3))); + Assert.assertNull(allocs.get(NodeId.newInstance("d", 4))); + + // New Allocate request + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = Records.newRecord(ResourceRequest.class); + opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC); + opportunisticReq.setNumContainers(6); + opportunisticReq.setCapability(Resource.newInstance(512, 3)); + opportunisticReq.setPriority(Priority.newInstance(100)); + opportunisticReq.setRelaxLocality(true); + opportunisticReq.setResourceName("*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + + // Verify 6 containers were allocated + allocateResponse = localScheduler.allocate(allocateRequest); + Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size()); + + // Verify New containers are equally distribution on hosts c and d + // And None on a and b + allocs = mapAllocs(allocateResponse); + Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size()); + Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size()); + Assert.assertNull(allocs.get(NodeId.newInstance("a", 1))); + Assert.assertNull(allocs.get(NodeId.newInstance("b", 2))); + } + + private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) { + DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord + (DistSchedAllocateResponse.class); + distSchedAllocateResponse.setAllocateResponse( + Records.newRecord(AllocateResponse.class)); + distSchedAllocateResponse.setNodesForScheduling(nodes); + return distSchedAllocateResponse; + } + + private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse + allocateResponse) { + Map<NodeId, List<ContainerId>> allocs = new HashMap<>(); + for (Container c : allocateResponse.getAllocatedContainers()) { + List<ContainerId> cIds = allocs.get(c.getNodeId()); + if (cIds == null) { + cIds = new ArrayList<>(); + allocs.put(c.getNodeId(), cIds); + } + cIds.add(c.getId()); + } + return allocs; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java index 84e42fc..6a72cc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java @@ -96,7 +96,7 @@ public class TestContainerLogsPage { healthChecker.init(conf); LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler(); NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, - new ApplicationACLsManager(conf), new NMNullStateStoreService()); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); // Add an application and the corresponding containers RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf); String user = "nobody"; @@ -136,7 +136,7 @@ public class TestContainerLogsPage { when(dirsHandlerForFullDisk.getLogDirsForRead()). thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()})); nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk, - new ApplicationACLsManager(conf), new NMNullStateStoreService()); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); nmContext.getApplications().put(appId, app); container.setState(ContainerState.RUNNING); nmContext.getContainers().put(container1, container); @@ -158,7 +158,7 @@ public class TestContainerLogsPage { LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); dirsHandler.init(conf); NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, - new ApplicationACLsManager(conf), new NMNullStateStoreService()); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); // Add an application and the corresponding containers String user = "nobody"; long clusterTimeStamp = 1234; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index f0c7cbc..b90c1be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -87,7 +87,7 @@ public class TestNMWebServer { private int startNMWebAppServer(String webAddr) { Context nmContext = new NodeManager.NMContext(null, null, null, null, - null); + null, false); ResourceView resourceView = new ResourceView() { @Override public long getVmemAllocatedForContainers() { @@ -150,7 +150,7 @@ public class TestNMWebServer { @Test public void testNMWebApp() throws IOException, YarnException { Context nmContext = new NodeManager.NMContext(null, null, null, null, - null); + null, false); ResourceView resourceView = new ResourceView() { @Override public long getVmemAllocatedForContainers() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 1f5590c..2ac0956 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -111,7 +111,7 @@ public class TestNMWebServices extends JerseyTestBase { healthChecker.init(conf); aclsManager = new ApplicationACLsManager(conf); nmContext = new NodeManager.NMContext(null, null, dirsHandler, - aclsManager, null); + aclsManager, null, false); NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042); ((NodeManager.NMContext)nmContext).setNodeId(nodeId); resourceView = new ResourceView() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java index e274abb..dfbcf06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java @@ -104,7 +104,7 @@ public class TestNMWebServicesApps extends JerseyTestBase { dirsHandler = healthChecker.getDiskHandler(); aclsManager = new ApplicationACLsManager(conf); nmContext = new NodeManager.NMContext(null, null, dirsHandler, - aclsManager, null); + aclsManager, null, false); NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999); ((NodeManager.NMContext)nmContext).setNodeId(nodeId); resourceView = new ResourceView() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java index 3c4a660..18239f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java @@ -136,7 +136,7 @@ public class TestNMWebServicesContainers extends JerseyTestBase { dirsHandler = healthChecker.getDiskHandler(); aclsManager = new ApplicationACLsManager(conf); nmContext = new NodeManager.NMContext(null, null, dirsHandler, - aclsManager, null) { + aclsManager, null, false) { public NodeId getNodeId() { return NodeId.newInstance("testhost.foo.com", 8042); }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index ab94175..4f90fa0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -89,6 +91,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security + .AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -104,21 +108,27 @@ public class ApplicationMasterService extends AbstractService implements private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class); private final AMLivelinessMonitor amLivelinessMonitor; private YarnScheduler rScheduler; - private InetSocketAddress masterServiceAddress; - private Server server; - private final RecordFactory recordFactory = + protected InetSocketAddress masterServiceAddress; + protected Server server; + protected final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap = new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>(); - private final RMContext rmContext; + protected final RMContext rmContext; - public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { - super(ApplicationMasterService.class.getName()); + public ApplicationMasterService(String name, RMContext rmContext, + YarnScheduler scheduler) { + super(name); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; this.rmContext = rmContext; } + public ApplicationMasterService(RMContext rmContext, + YarnScheduler scheduler) { + this(ApplicationMasterService.class.getName(), rmContext, scheduler); + } + @Override protected void serviceInit(Configuration conf) throws Exception { masterServiceAddress = conf.getSocketAddr( @@ -139,11 +149,8 @@ public class ApplicationMasterService extends AbstractService implements serverConf.set( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, SaslRpcServer.AuthMethod.TOKEN.toString()); - this.server = - rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress, - serverConf, this.rmContext.getAMRMTokenSecretManager(), - serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); + this.server = getServer(rpc, serverConf, masterServiceAddress, + this.rmContext.getAMRMTokenSecretManager()); // Enable service authorization? if (conf.getBoolean( @@ -158,7 +165,7 @@ public class ApplicationMasterService extends AbstractService implements } refreshServiceAcls(conf, RMPolicyProvider.getInstance()); } - + this.server.start(); this.masterServiceAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, @@ -168,6 +175,14 @@ public class ApplicationMasterService extends AbstractService implements super.serviceStart(); } + protected Server getServer(YarnRPC rpc, Configuration serverConf, + InetSocketAddress addr, AMRMTokenSecretManager secretManager) { + return rpc.getServer(ApplicationMasterProtocol.class, this, addr, + serverConf, secretManager, + serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); + } + @Private public InetSocketAddress getBindAddress() { return this.masterServiceAddress; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java new file mode 100644 index 0000000..5210f7f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; +import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; + + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security + .AMRMTokenSecretManager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; + +public class DistributedSchedulingService extends ApplicationMasterService + implements DistributedSchedulerProtocol { + + public DistributedSchedulingService(RMContext rmContext, + YarnScheduler scheduler) { + super(DistributedSchedulingService.class.getName(), rmContext, scheduler); + } + + @Override + public Server getServer(YarnRPC rpc, Configuration serverConf, + InetSocketAddress addr, AMRMTokenSecretManager secretManager) { + Server server = rpc.getServer(DistributedSchedulerProtocol.class, this, + addr, serverConf, secretManager, + serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); + // To support application running no NMs that DO NOT support + // Dist Scheduling... + ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + ApplicationMasterProtocolPB.class, + ApplicationMasterProtocolService.newReflectiveBlockingService( + new ApplicationMasterProtocolPBServiceImpl(this))); + return server; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return super.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster + (FinishApplicationMasterRequest request) throws YarnException, + IOException { + return super.finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + return super.allocate(request); + } + + @Override + public DistSchedRegisterResponse + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + RegisterApplicationMasterResponse response = + registerApplicationMaster(request); + DistSchedRegisterResponse dsResp = recordFactory + .newRecordInstance(DistSchedRegisterResponse.class); + dsResp.setRegisterResponse(response); + dsResp.setMinAllocatableCapabilty( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY, + YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT), + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MIN_VCORES, + YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT) + ) + ); + dsResp.setMaxAllocatableCapabilty( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY, + YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT), + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MAX_VCORES, + YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT) + ) + ); + dsResp.setIncrAllocatableCapabilty( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY, + YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT), + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_INCR_VCORES, + YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT) + ) + ); + dsResp.setContainerTokenExpiryInterval( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS, + YarnConfiguration. + DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT)); + dsResp.setContainerIdStart( + this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); + + // Set nodes to be used for scheduling + // TODO: The actual computation of the list will happen in YARN-4412 + // TODO: Till then, send the complete list + dsResp.setNodesForScheduling( + new ArrayList<>(this.rmContext.getRMNodes().keySet())); + return dsResp; + } + + @Override + public DistSchedAllocateResponse allocateForDistributedScheduling + (AllocateRequest request) throws YarnException, IOException { + AllocateResponse response = allocate(request); + DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance + (DistSchedAllocateResponse.class); + dsResp.setAllocateResponse(response); + dsResp.setNodesForScheduling( + new ArrayList<>(this.rmContext.getRMNodes().keySet())); + return dsResp; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 2744bb4..2fc940b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -134,6 +134,11 @@ public class ResourceManager extends CompositeService implements Recoverable { */ public static final int SHUTDOWN_HOOK_PRIORITY = 30; + /** + * Used for generation of various ids. + */ + public static final int EPOCH_BIT_SHIFT = 40; + private static final Log LOG = LogFactory.getLog(ResourceManager.class); private static long clusterTimeStamp = System.currentTimeMillis(); @@ -1226,6 +1231,11 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected ApplicationMasterService createApplicationMasterService() { + if (this.rmContext.getYarnConfiguration().getBoolean( + YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { + return new DistributedSchedulingService(this.rmContext, scheduler); + } return new ApplicationMasterService(this.rmContext, scheduler); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index a61001e..568fd4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -91,7 +91,8 @@ public class AppSchedulingInfo { this.queue = queue; this.user = user; this.activeUsersManager = activeUsersManager; - this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT); + this.containerIdCounter = + new AtomicLong(epoch << EPOCH_BIT_SHIFT); this.appResourceUsage = appResourceUsage; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d3112ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index d5b64c1..6182b07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -93,6 +93,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; + + import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.log4j.Level; @@ -737,6 +739,21 @@ public class MockRM extends ResourceManager { @Override protected ApplicationMasterService createApplicationMasterService() { + if (this.rmContext.getYarnConfiguration().getBoolean( + YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { + return new DistributedSchedulingService(getRMContext(), scheduler) { + @Override + protected void serviceStart() { + // override to not start rpc handler + } + + @Override + protected void serviceStop() { + // don't do anything + } + }; + } return new ApplicationMasterService(getRMContext(), scheduler) { @Override protected void serviceStart() {