This is an automated email from the ASF dual-hosted git repository. abmodi pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 4d3c580 YARN-9859. Refactoring of OpportunisticContainerAllocator. Contributed by Abhishek Modi. 4d3c580 is described below commit 4d3c580b03475a6ec9323d11e6875c542f8e3f6d Author: Abhishek Modi <abm...@apache.org> AuthorDate: Mon Sep 30 23:40:15 2019 +0530 YARN-9859. Refactoring of OpportunisticContainerAllocator. Contributed by Abhishek Modi. --- ...DistributedOpportunisticContainerAllocator.java | 357 +++++++++++++++++++++ .../scheduler/OpportunisticContainerAllocator.java | 347 +++----------------- .../TestOpportunisticContainerAllocator.java | 2 +- .../yarn/server/nodemanager/NodeManager.java | 3 +- .../scheduler/TestDistributedScheduler.java | 4 +- .../OpportunisticContainerAllocatorAMService.java | 8 +- 6 files changed, 416 insertions(+), 305 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java new file mode 100644 index 0000000..da90167 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * <p> + * The DistributedOpportunisticContainerAllocator allocates containers on a + * given list of nodes, after modifying the container sizes to respect the + * limits set by the ResourceManager. It tries to distribute the containers + * as evenly as possible. + * </p> + */ +public class DistributedOpportunisticContainerAllocator + extends OpportunisticContainerAllocator { + + private static final int NODE_LOCAL_LOOP = 0; + private static final int RACK_LOCAL_LOOP = 1; + private static final int OFF_SWITCH_LOOP = 2; + + private static final Logger LOG = + LoggerFactory.getLogger(DistributedOpportunisticContainerAllocator.class); + + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + */ + public DistributedOpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager) { + super(tokenSecretManager); + } + + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + * @param maxAllocationsPerAMHeartbeat max number of containers to be + * allocated in one AM heartbeat + */ + public DistributedOpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, + int maxAllocationsPerAMHeartbeat) { + super(tokenSecretManager, maxAllocationsPerAMHeartbeat); + } + + @Override + public List<Container> allocateContainers(ResourceBlacklistRequest blackList, + List<ResourceRequest> oppResourceReqs, + ApplicationAttemptId applicationAttemptId, + OpportunisticContainerContext opportContext, long rmIdentifier, + String appSubmitter) throws YarnException { + + // Update black list. + updateBlacklist(blackList, opportContext); + + // Add OPPORTUNISTIC requests to the outstanding ones. + opportContext.addToOutstandingReqs(oppResourceReqs); + Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist()); + Set<String> allocatedNodes = new HashSet<>(); + List<Container> allocatedContainers = new ArrayList<>(); + + // Satisfy the outstanding OPPORTUNISTIC requests. + boolean continueLoop = true; + while (continueLoop) { + continueLoop = false; + List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>(); + for (SchedulerRequestKey schedulerKey : + opportContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given cap (the actual container size + // might be different than what is requested, which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + int remAllocs = -1; + int maxAllocationsPerAMHeartbeat = getMaxAllocationsPerAMHeartbeat(); + if (maxAllocationsPerAMHeartbeat > 0) { + remAllocs = + maxAllocationsPerAMHeartbeat - allocatedContainers.size() + - getTotalAllocations(allocations); + if (remAllocs <= 0) { + LOG.info("Not allocating more containers as we have reached max " + + "allocations per AM heartbeat {}", + maxAllocationsPerAMHeartbeat); + break; + } + } + Map<Resource, List<Allocation>> allocation = allocate( + rmIdentifier, opportContext, schedulerKey, applicationAttemptId, + appSubmitter, nodeBlackList, allocatedNodes, remAllocs); + if (allocation.size() > 0) { + allocations.add(allocation); + continueLoop = true; + } + } + matchAllocation(allocations, allocatedContainers, opportContext); + } + + return allocatedContainers; + } + + private Map<Resource, List<Allocation>> allocate(long rmIdentifier, + OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, + ApplicationAttemptId appAttId, String userName, Set<String> blackList, + Set<String> allocatedNodes, int maxAllocations) + throws YarnException { + Map<Resource, List<Allocation>> containers = new HashMap<>(); + for (EnrichedResourceRequest enrichedAsk : + appContext.getOutstandingOpReqs().get(schedKey).values()) { + int remainingAllocs = -1; + if (maxAllocations > 0) { + int totalAllocated = 0; + for (List<Allocation> allocs : containers.values()) { + totalAllocated += allocs.size(); + } + remainingAllocs = maxAllocations - totalAllocated; + if (remainingAllocs <= 0) { + LOG.info("Not allocating more containers as max allocations per AM " + + "heartbeat {} has reached", getMaxAllocationsPerAMHeartbeat()); + break; + } + } + allocateContainersInternal(rmIdentifier, appContext.getAppParams(), + appContext.getContainerIdGenerator(), blackList, allocatedNodes, + appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk, + remainingAllocs); + ResourceRequest anyAsk = enrichedAsk.getRequest(); + if (!containers.isEmpty()) { + LOG.info("Opportunistic allocation requested for [priority={}, " + + "allocationRequestId={}, num_containers={}, capability={}] " + + "allocated = {}", anyAsk.getPriority(), + anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(), + anyAsk.getCapability(), containers.keySet()); + } + } + return containers; + } + + private void allocateContainersInternal(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set<String> blacklist, Set<String> allocatedNodes, + ApplicationAttemptId id, Map<String, RemoteNode> allNodes, + String userName, Map<Resource, List<Allocation>> allocations, + EnrichedResourceRequest enrichedAsk, int maxAllocations) + throws YarnException { + if (allNodes.size() == 0) { + LOG.info("No nodes currently available to " + + "allocate OPPORTUNISTIC containers."); + return; + } + ResourceRequest anyAsk = enrichedAsk.getRequest(); + int toAllocate = anyAsk.getNumContainers() + - (allocations.isEmpty() ? 0 : + allocations.get(anyAsk.getCapability()).size()); + toAllocate = Math.min(toAllocate, + appParams.getMaxAllocationsPerSchedulerKeyPerRound()); + if (maxAllocations >= 0) { + toAllocate = Math.min(maxAllocations, toAllocate); + } + int numAllocated = 0; + // Node Candidates are selected as follows: + // * Node local candidates selected in loop == 0 + // * Rack local candidates selected in loop == 1 + // * From loop == 2 onwards, we revert to off switch allocations. + int loopIndex = OFF_SWITCH_LOOP; + if (enrichedAsk.getNodeLocations().size() > 0) { + loopIndex = NODE_LOCAL_LOOP; + } + while (numAllocated < toAllocate) { + Collection<RemoteNode> nodeCandidates = + findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes, + enrichedAsk); + for (RemoteNode rNode : nodeCandidates) { + String rNodeHost = rNode.getNodeId().getHost(); + // Ignore black list + if (blacklist.contains(rNodeHost)) { + LOG.info("Nodes for scheduling has a blacklisted node" + + " [" + rNodeHost + "].."); + continue; + } + String location = ResourceRequest.ANY; + if (loopIndex == NODE_LOCAL_LOOP) { + if (enrichedAsk.getNodeLocations().contains(rNodeHost)) { + location = rNodeHost; + } else { + continue; + } + } else if (allocatedNodes.contains(rNodeHost)) { + LOG.info("Opportunistic container has already been allocated on {}.", + rNodeHost); + continue; + } + if (loopIndex == RACK_LOCAL_LOOP) { + if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) { + location = rNode.getRackName(); + } else { + continue; + } + } + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, location, + anyAsk, rNode); + numAllocated++; + updateMetrics(loopIndex); + allocatedNodes.add(rNodeHost); + LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + + "location [" + location + "]"); + if (numAllocated >= toAllocate) { + break; + } + } + if (loopIndex == NODE_LOCAL_LOOP && + enrichedAsk.getRackLocations().size() > 0) { + loopIndex = RACK_LOCAL_LOOP; + } else { + loopIndex++; + } + // Handle case where there are no nodes remaining after blacklist is + // considered. + if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) { + LOG.warn("Unable to allocate any opportunistic containers."); + break; + } + } + } + + + + private void updateMetrics(int loopIndex) { + OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + if (loopIndex == NODE_LOCAL_LOOP) { + metrics.incrNodeLocalOppContainers(); + } else if (loopIndex == RACK_LOCAL_LOOP) { + metrics.incrRackLocalOppContainers(); + } else { + metrics.incrOffSwitchOppContainers(); + } + } + + private Collection<RemoteNode> findNodeCandidates(int loopIndex, + Map<String, RemoteNode> allNodes, Set<String> blackList, + Set<String> allocatedNodes, EnrichedResourceRequest enrichedRR) { + LinkedList<RemoteNode> retList = new LinkedList<>(); + String partition = getRequestPartition(enrichedRR); + if (loopIndex > 1) { + for (RemoteNode remoteNode : allNodes.values()) { + if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { + retList.add(remoteNode); + } + } + return retList; + } else { + + int numContainers = enrichedRR.getRequest().getNumContainers(); + while (numContainers > 0) { + if (loopIndex == 0) { + // Node local candidates + numContainers = collectNodeLocalCandidates( + allNodes, enrichedRR, retList, numContainers); + } else { + // Rack local candidates + numContainers = + collectRackLocalCandidates(allNodes, enrichedRR, retList, + blackList, allocatedNodes, numContainers); + } + if (numContainers == enrichedRR.getRequest().getNumContainers()) { + // If there is no change in numContainers, then there is no point + // in looping again. + break; + } + } + return retList; + } + } + + private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes, + EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList, + Set<String> blackList, Set<String> allocatedNodes, int numContainers) { + String partition = getRequestPartition(enrichedRR); + for (RemoteNode rNode : allNodes.values()) { + if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) && + enrichedRR.getRackLocations().contains(rNode.getRackName())) { + String rHost = rNode.getNodeId().getHost(); + if (blackList.contains(rHost)) { + continue; + } + if (allocatedNodes.contains(rHost)) { + retList.addLast(rNode); + } else { + retList.addFirst(rNode); + numContainers--; + } + } + if (numContainers == 0) { + break; + } + } + return numContainers; + } + + private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes, + EnrichedResourceRequest enrichedRR, List<RemoteNode> retList, + int numContainers) { + String partition = getRequestPartition(enrichedRR); + for (String nodeName : enrichedRR.getNodeLocations()) { + RemoteNode remoteNode = allNodes.get(nodeName); + if (remoteNode != null && + StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { + retList.add(remoteNode); + numContainers--; + } + if (numContainers == 0) { + break; + } + } + return numContainers; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 0ce1976..4a17a65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.scheduler; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.Time; @@ -38,21 +37,15 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; -import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -61,16 +54,11 @@ import java.util.concurrent.atomic.AtomicLong; /** * <p> - * The OpportunisticContainerAllocator allocates containers on a given list of - * nodes, after modifying the container sizes to respect the limits set by the - * ResourceManager. It tries to distribute the containers as evenly as possible. + * Base abstract class for Opportunistic container allocations, that provides + * common functions required for Opportunistic container allocation. * </p> */ -public class OpportunisticContainerAllocator { - - private static final int NODE_LOCAL_LOOP = 0; - private static final int RACK_LOCAL_LOOP = 1; - private static final int OFF_SWITCH_LOOP = 2; +public abstract class OpportunisticContainerAllocator { private int maxAllocationsPerAMHeartbeat = -1; @@ -212,9 +200,6 @@ public class OpportunisticContainerAllocator { } } - private static final Logger LOG = - LoggerFactory.getLogger(OpportunisticContainerAllocator.class); - private static final ResourceCalculator RESOURCE_CALCULATOR = new DominantResourceCalculator(); @@ -238,26 +223,30 @@ public class OpportunisticContainerAllocator { } } - static class EnrichedResourceRequest { + /** + * This class encapsulates Resource Request and provides requests per + * node and rack. + */ + public static class EnrichedResourceRequest { private final Map<String, AtomicInteger> nodeLocations = new HashMap<>(); private final Map<String, AtomicInteger> rackLocations = new HashMap<>(); private final ResourceRequest request; private final long timestamp; - EnrichedResourceRequest(ResourceRequest request) { + public EnrichedResourceRequest(ResourceRequest request) { this.request = request; timestamp = Time.monotonicNow(); } - long getTimestamp() { + public long getTimestamp() { return timestamp; } - ResourceRequest getRequest() { + public ResourceRequest getRequest() { return request; } - void addLocation(String location, int count) { + public void addLocation(String location, int count) { Map<String, AtomicInteger> m = rackLocations; if (!location.startsWith("/")) { m = nodeLocations; @@ -269,7 +258,7 @@ public class OpportunisticContainerAllocator { } } - void removeLocation(String location) { + public void removeLocation(String location) { Map<String, AtomicInteger> m = rackLocations; AtomicInteger count = m.get(location); if (count == null) { @@ -284,14 +273,15 @@ public class OpportunisticContainerAllocator { } } - Set<String> getNodeLocations() { + public Set<String> getNodeLocations() { return nodeLocations.keySet(); } - Set<String> getRackLocations() { + public Set<String> getRackLocations() { return rackLocations.keySet(); } } + /** * Create a new Opportunistic Container Allocator. * @param tokenSecretManager TokenSecretManager @@ -320,6 +310,14 @@ public class OpportunisticContainerAllocator { } /** + * Get the Max Allocations per AM heartbeat. + * @return maxAllocationsPerAMHeartbeat. + */ + public int getMaxAllocationsPerAMHeartbeat() { + return this.maxAllocationsPerAMHeartbeat; + } + + /** * Allocate OPPORTUNISTIC containers. * @param blackList Resource BlackList Request * @param oppResourceReqs Opportunistic Resource Requests @@ -330,72 +328,37 @@ public class OpportunisticContainerAllocator { * @return List of Containers. * @throws YarnException YarnException */ - public List<Container> allocateContainers(ResourceBlacklistRequest blackList, + public abstract List<Container> allocateContainers( + ResourceBlacklistRequest blackList, List<ResourceRequest> oppResourceReqs, ApplicationAttemptId applicationAttemptId, OpportunisticContainerContext opportContext, long rmIdentifier, - String appSubmitter) throws YarnException { + String appSubmitter) throws YarnException; + - // Update black list. + protected void updateBlacklist(ResourceBlacklistRequest blackList, + OpportunisticContainerContext oppContext) { if (blackList != null) { - opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals()); - opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions()); + oppContext.getBlacklist().removeAll(blackList.getBlacklistRemovals()); + oppContext.getBlacklist().addAll(blackList.getBlacklistAdditions()); } + } - // Add OPPORTUNISTIC requests to the outstanding ones. - opportContext.addToOutstandingReqs(oppResourceReqs); - Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist()); - Set<String> allocatedNodes = new HashSet<>(); - List<Container> allocatedContainers = new ArrayList<>(); - - // Satisfy the outstanding OPPORTUNISTIC requests. - boolean continueLoop = true; - while (continueLoop) { - continueLoop = false; - List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>(); - for (SchedulerRequestKey schedulerKey : - opportContext.getOutstandingOpReqs().descendingKeySet()) { - // Allocated containers : - // Key = Requested Capability, - // Value = List of Containers of given cap (the actual container size - // might be different than what is requested, which is why - // we need the requested capability (key) to match against - // the outstanding reqs) - int remAllocs = -1; - if (maxAllocationsPerAMHeartbeat > 0) { - remAllocs = - maxAllocationsPerAMHeartbeat - allocatedContainers.size() - - getTotalAllocations(allocations); - if (remAllocs <= 0) { - LOG.info("Not allocating more containers as we have reached max " - + "allocations per AM heartbeat {}", - maxAllocationsPerAMHeartbeat); - break; - } - } - Map<Resource, List<Allocation>> allocation = allocate( - rmIdentifier, opportContext, schedulerKey, applicationAttemptId, - appSubmitter, nodeBlackList, allocatedNodes, remAllocs); - if (allocation.size() > 0) { - allocations.add(allocation); - continueLoop = true; - } - } - for (Map<Resource, List<Allocation>> allocation : allocations) { - for (Map.Entry<Resource, List<Allocation>> e : allocation.entrySet()) { - opportContext.matchAllocationToOutstandingRequest( - e.getKey(), e.getValue()); - for (Allocation alloc : e.getValue()) { - allocatedContainers.add(alloc.getContainer()); - } + protected void matchAllocation(List<Map<Resource, + List<Allocation>>> allocations, List<Container> allocatedContainers, + OpportunisticContainerContext oppContext) { + for (Map<Resource, List<Allocation>> allocation : allocations) { + for (Map.Entry<Resource, List<Allocation>> e : allocation.entrySet()) { + oppContext.matchAllocationToOutstandingRequest( + e.getKey(), e.getValue()); + for (Allocation alloc : e.getValue()) { + allocatedContainers.add(alloc.getContainer()); } } } - - return allocatedContainers; } - private int getTotalAllocations( + protected int getTotalAllocations( List<Map<Resource, List<Allocation>>> allocations) { int totalAllocs = 0; for (Map<Resource, List<Allocation>> allocation : allocations) { @@ -406,223 +369,8 @@ public class OpportunisticContainerAllocator { return totalAllocs; } - private Map<Resource, List<Allocation>> allocate(long rmIdentifier, - OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, - ApplicationAttemptId appAttId, String userName, Set<String> blackList, - Set<String> allocatedNodes, int maxAllocations) - throws YarnException { - Map<Resource, List<Allocation>> containers = new HashMap<>(); - for (EnrichedResourceRequest enrichedAsk : - appContext.getOutstandingOpReqs().get(schedKey).values()) { - int remainingAllocs = -1; - if (maxAllocations > 0) { - int totalAllocated = 0; - for (List<Allocation> allocs : containers.values()) { - totalAllocated += allocs.size(); - } - remainingAllocs = maxAllocations - totalAllocated; - if (remainingAllocs <= 0) { - LOG.info("Not allocating more containers as max allocations per AM " - + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat); - break; - } - } - allocateContainersInternal(rmIdentifier, appContext.getAppParams(), - appContext.getContainerIdGenerator(), blackList, allocatedNodes, - appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk, - remainingAllocs); - ResourceRequest anyAsk = enrichedAsk.getRequest(); - if (!containers.isEmpty()) { - LOG.info("Opportunistic allocation requested for [priority={}, " - + "allocationRequestId={}, num_containers={}, capability={}] " - + "allocated = {}", anyAsk.getPriority(), - anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(), - anyAsk.getCapability(), containers.keySet()); - } - } - return containers; - } - - private void allocateContainersInternal(long rmIdentifier, - AllocationParams appParams, ContainerIdGenerator idCounter, - Set<String> blacklist, Set<String> allocatedNodes, - ApplicationAttemptId id, Map<String, RemoteNode> allNodes, - String userName, Map<Resource, List<Allocation>> allocations, - EnrichedResourceRequest enrichedAsk, int maxAllocations) - throws YarnException { - if (allNodes.size() == 0) { - LOG.info("No nodes currently available to " + - "allocate OPPORTUNISTIC containers."); - return; - } - ResourceRequest anyAsk = enrichedAsk.getRequest(); - int toAllocate = anyAsk.getNumContainers() - - (allocations.isEmpty() ? 0 : - allocations.get(anyAsk.getCapability()).size()); - toAllocate = Math.min(toAllocate, - appParams.getMaxAllocationsPerSchedulerKeyPerRound()); - if (maxAllocations >= 0) { - toAllocate = Math.min(maxAllocations, toAllocate); - } - int numAllocated = 0; - // Node Candidates are selected as follows: - // * Node local candidates selected in loop == 0 - // * Rack local candidates selected in loop == 1 - // * From loop == 2 onwards, we revert to off switch allocations. - int loopIndex = OFF_SWITCH_LOOP; - if (enrichedAsk.getNodeLocations().size() > 0) { - loopIndex = NODE_LOCAL_LOOP; - } - while (numAllocated < toAllocate) { - Collection<RemoteNode> nodeCandidates = - findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes, - enrichedAsk); - for (RemoteNode rNode : nodeCandidates) { - String rNodeHost = rNode.getNodeId().getHost(); - // Ignore black list - if (blacklist.contains(rNodeHost)) { - LOG.info("Nodes for scheduling has a blacklisted node" + - " [" + rNodeHost + "].."); - continue; - } - String location = ResourceRequest.ANY; - if (loopIndex == NODE_LOCAL_LOOP) { - if (enrichedAsk.getNodeLocations().contains(rNodeHost)) { - location = rNodeHost; - } else { - continue; - } - } else if (allocatedNodes.contains(rNodeHost)) { - LOG.info("Opportunistic container has already been allocated on {}.", - rNodeHost); - continue; - } - if (loopIndex == RACK_LOCAL_LOOP) { - if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) { - location = rNode.getRackName(); - } else { - continue; - } - } - Container container = createContainer(rmIdentifier, appParams, - idCounter, id, userName, allocations, location, - anyAsk, rNode); - numAllocated++; - updateMetrics(loopIndex); - allocatedNodes.add(rNodeHost); - LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + - "location [" + location + "]"); - if (numAllocated >= toAllocate) { - break; - } - } - if (loopIndex == NODE_LOCAL_LOOP && - enrichedAsk.getRackLocations().size() > 0) { - loopIndex = RACK_LOCAL_LOOP; - } else { - loopIndex++; - } - // Handle case where there are no nodes remaining after blacklist is - // considered. - if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) { - LOG.warn("Unable to allocate any opportunistic containers."); - break; - } - } - } - - private void updateMetrics(int loopIndex) { - OpportunisticSchedulerMetrics metrics = - OpportunisticSchedulerMetrics.getMetrics(); - if (loopIndex == NODE_LOCAL_LOOP) { - metrics.incrNodeLocalOppContainers(); - } else if (loopIndex == RACK_LOCAL_LOOP) { - metrics.incrRackLocalOppContainers(); - } else { - metrics.incrOffSwitchOppContainers(); - } - } - - private Collection<RemoteNode> findNodeCandidates(int loopIndex, - Map<String, RemoteNode> allNodes, Set<String> blackList, - Set<String> allocatedNodes, EnrichedResourceRequest enrichedRR) { - LinkedList<RemoteNode> retList = new LinkedList<>(); - String partition = getRequestPartition(enrichedRR); - if (loopIndex > 1) { - for (RemoteNode remoteNode : allNodes.values()) { - if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { - retList.add(remoteNode); - } - } - return retList; - } else { - - int numContainers = enrichedRR.getRequest().getNumContainers(); - while (numContainers > 0) { - if (loopIndex == 0) { - // Node local candidates - numContainers = collectNodeLocalCandidates( - allNodes, enrichedRR, retList, numContainers); - } else { - // Rack local candidates - numContainers = - collectRackLocalCandidates(allNodes, enrichedRR, retList, - blackList, allocatedNodes, numContainers); - } - if (numContainers == enrichedRR.getRequest().getNumContainers()) { - // If there is no change in numContainers, then there is no point - // in looping again. - break; - } - } - return retList; - } - } - - private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes, - EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList, - Set<String> blackList, Set<String> allocatedNodes, int numContainers) { - String partition = getRequestPartition(enrichedRR); - for (RemoteNode rNode : allNodes.values()) { - if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) && - enrichedRR.getRackLocations().contains(rNode.getRackName())) { - String rHost = rNode.getNodeId().getHost(); - if (blackList.contains(rHost)) { - continue; - } - if (allocatedNodes.contains(rHost)) { - retList.addLast(rNode); - } else { - retList.addFirst(rNode); - numContainers--; - } - } - if (numContainers == 0) { - break; - } - } - return numContainers; - } - - private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes, - EnrichedResourceRequest enrichedRR, List<RemoteNode> retList, - int numContainers) { - String partition = getRequestPartition(enrichedRR); - for (String nodeName : enrichedRR.getNodeLocations()) { - RemoteNode remoteNode = allNodes.get(nodeName); - if (remoteNode != null && - StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { - retList.add(remoteNode); - numContainers--; - } - if (numContainers == 0) { - break; - } - } - return numContainers; - } - - private Container createContainer(long rmIdentifier, + @SuppressWarnings("checkstyle:parameternumber") + protected Container createContainer(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, ApplicationAttemptId id, String userName, Map<Resource, List<Allocation>> allocations, String location, @@ -654,6 +402,7 @@ public class OpportunisticContainerAllocator { SchedulerRequestKey.create(rr), userName, node, cId, capability); } + @SuppressWarnings("checkstyle:parameternumber") private Container createContainer(long rmIdentifier, long tokenExpiry, SchedulerRequestKey schedulerKey, String userName, RemoteNode node, ContainerId cId, Resource capability) { @@ -718,7 +467,7 @@ public class OpportunisticContainerAllocator { return partitionedRequests; } - private String getRequestPartition(EnrichedResourceRequest enrichedRR) { + protected String getRequestPartition(EnrichedResourceRequest enrichedRR) { String partition = enrichedRR.getRequest().getNodeLabelExpression(); if (partition == null) { partition = CommonNodeLabelsManager.NO_LABEL; @@ -726,7 +475,7 @@ public class OpportunisticContainerAllocator { return partition; } - private String getRemoteNodePartition(RemoteNode node) { + protected String getRemoteNodePartition(RemoteNode node) { String partition = node.getNodePartition(); if (partition == null) { partition = CommonNodeLabelsManager.NO_LABEL; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 548ddad..6a91f41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -98,7 +98,7 @@ public class TestOpportunisticContainerAllocator { return new byte[]{1, 2}; } }; - allocator = new OpportunisticContainerAllocator(secMan); + allocator = new DistributedOpportunisticContainerAllocator(secMan); oppCntxt = new OpportunisticContainerContext(); oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1)); oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index db3aaca..4bbae34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; +import org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.state.MultiStateTransitionListener; @@ -479,7 +480,7 @@ public class NodeManager extends CompositeService YarnConfiguration. DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT); ((NMContext) context).setQueueableContainerAllocator( - new OpportunisticContainerAllocator( + new DistributedOpportunisticContainerAllocator( context.getContainerTokenSecretManager(), maxAllocationsPerAMHeartbeat)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java index dee2a20..5a0715e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; @@ -232,7 +233,8 @@ public class TestDistributedScheduler { }; nmContainerTokenSecretManager.setMasterKey(mKey); OpportunisticContainerAllocator containerAllocator = - new OpportunisticContainerAllocator(nmContainerTokenSecretManager); + new DistributedOpportunisticContainerAllocator( + nmContainerTokenSecretManager); NMTokenSecretManagerInNM nmTokenSecretManagerInNM = new NMTokenSecretManagerInNM(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index a360ed2..4475caf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -233,9 +234,10 @@ public class OpportunisticContainerAllocatorAMService YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT, YarnConfiguration. DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT); - this.oppContainerAllocator = new OpportunisticContainerAllocator( - rmContext.getContainerTokenSecretManager(), - maxAllocationsPerAMHeartbeat); + this.oppContainerAllocator = + new DistributedOpportunisticContainerAllocator( + rmContext.getContainerTokenSecretManager(), + maxAllocationsPerAMHeartbeat); this.k = rmContext.getYarnConfiguration().getInt( YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org