This is an automated email from the ASF dual-hosted git repository.

abmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4d3c580  YARN-9859. Refactoring of OpportunisticContainerAllocator. 
Contributed by Abhishek Modi.
4d3c580 is described below

commit 4d3c580b03475a6ec9323d11e6875c542f8e3f6d
Author: Abhishek Modi <abm...@apache.org>
AuthorDate: Mon Sep 30 23:40:15 2019 +0530

    YARN-9859. Refactoring of OpportunisticContainerAllocator. Contributed by 
Abhishek Modi.
---
 ...DistributedOpportunisticContainerAllocator.java | 357 +++++++++++++++++++++
 .../scheduler/OpportunisticContainerAllocator.java | 347 +++-----------------
 .../TestOpportunisticContainerAllocator.java       |   2 +-
 .../yarn/server/nodemanager/NodeManager.java       |   3 +-
 .../scheduler/TestDistributedScheduler.java        |   4 +-
 .../OpportunisticContainerAllocatorAMService.java  |   8 +-
 6 files changed, 416 insertions(+), 305 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java
new file mode 100644
index 0000000..da90167
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * <p>
+ * The DistributedOpportunisticContainerAllocator allocates containers on a
+ * given list of nodes, after modifying the container sizes to respect the
+ * limits set by the ResourceManager. It tries to distribute the containers
+ * as evenly as possible.
+ * </p>
+ */
+public class DistributedOpportunisticContainerAllocator
+    extends OpportunisticContainerAllocator {
+
+  private static final int NODE_LOCAL_LOOP = 0;
+  private static final int RACK_LOCAL_LOOP = 1;
+  private static final int OFF_SWITCH_LOOP = 2;
+
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(DistributedOpportunisticContainerAllocator.class);
+
+  /**
+   * Create a new Opportunistic Container Allocator.
+   * @param tokenSecretManager TokenSecretManager
+   */
+  public DistributedOpportunisticContainerAllocator(
+      BaseContainerTokenSecretManager tokenSecretManager) {
+    super(tokenSecretManager);
+  }
+
+  /**
+   * Create a new Opportunistic Container Allocator.
+   * @param tokenSecretManager TokenSecretManager
+   * @param maxAllocationsPerAMHeartbeat max number of containers to be
+   *                                     allocated in one AM heartbeat
+   */
+  public DistributedOpportunisticContainerAllocator(
+      BaseContainerTokenSecretManager tokenSecretManager,
+      int maxAllocationsPerAMHeartbeat) {
+    super(tokenSecretManager, maxAllocationsPerAMHeartbeat);
+  }
+
+  @Override
+  public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
+      List<ResourceRequest> oppResourceReqs,
+      ApplicationAttemptId applicationAttemptId,
+      OpportunisticContainerContext opportContext, long rmIdentifier,
+      String appSubmitter) throws YarnException {
+
+    // Update black list.
+    updateBlacklist(blackList, opportContext);
+
+    // Add OPPORTUNISTIC requests to the outstanding ones.
+    opportContext.addToOutstandingReqs(oppResourceReqs);
+    Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
+    Set<String> allocatedNodes = new HashSet<>();
+    List<Container> allocatedContainers = new ArrayList<>();
+
+    // Satisfy the outstanding OPPORTUNISTIC requests.
+    boolean continueLoop = true;
+    while (continueLoop) {
+      continueLoop = false;
+      List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>();
+      for (SchedulerRequestKey schedulerKey :
+          opportContext.getOutstandingOpReqs().descendingKeySet()) {
+        // Allocated containers :
+        //  Key = Requested Capability,
+        //  Value = List of Containers of given cap (the actual container size
+        //          might be different than what is requested, which is why
+        //          we need the requested capability (key) to match against
+        //          the outstanding reqs)
+        int remAllocs = -1;
+        int maxAllocationsPerAMHeartbeat = getMaxAllocationsPerAMHeartbeat();
+        if (maxAllocationsPerAMHeartbeat > 0) {
+          remAllocs =
+              maxAllocationsPerAMHeartbeat - allocatedContainers.size()
+                  - getTotalAllocations(allocations);
+          if (remAllocs <= 0) {
+            LOG.info("Not allocating more containers as we have reached max "
+                    + "allocations per AM heartbeat {}",
+                maxAllocationsPerAMHeartbeat);
+            break;
+          }
+        }
+        Map<Resource, List<Allocation>> allocation = allocate(
+            rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
+            appSubmitter, nodeBlackList, allocatedNodes, remAllocs);
+        if (allocation.size() > 0) {
+          allocations.add(allocation);
+          continueLoop = true;
+        }
+      }
+      matchAllocation(allocations, allocatedContainers, opportContext);
+    }
+
+    return allocatedContainers;
+  }
+
+  private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
+      OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
+      ApplicationAttemptId appAttId, String userName, Set<String> blackList,
+      Set<String> allocatedNodes, int maxAllocations)
+      throws YarnException {
+    Map<Resource, List<Allocation>> containers = new HashMap<>();
+    for (EnrichedResourceRequest enrichedAsk :
+        appContext.getOutstandingOpReqs().get(schedKey).values()) {
+      int remainingAllocs = -1;
+      if (maxAllocations > 0) {
+        int totalAllocated = 0;
+        for (List<Allocation> allocs : containers.values()) {
+          totalAllocated += allocs.size();
+        }
+        remainingAllocs = maxAllocations - totalAllocated;
+        if (remainingAllocs <= 0) {
+          LOG.info("Not allocating more containers as max allocations per AM "
+              + "heartbeat {} has reached", getMaxAllocationsPerAMHeartbeat());
+          break;
+        }
+      }
+      allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
+          appContext.getContainerIdGenerator(), blackList, allocatedNodes,
+          appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk,
+          remainingAllocs);
+      ResourceRequest anyAsk = enrichedAsk.getRequest();
+      if (!containers.isEmpty()) {
+        LOG.info("Opportunistic allocation requested for [priority={}, "
+                + "allocationRequestId={}, num_containers={}, capability={}] "
+                + "allocated = {}", anyAsk.getPriority(),
+            anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(),
+            anyAsk.getCapability(), containers.keySet());
+      }
+    }
+    return containers;
+  }
+
+  private void allocateContainersInternal(long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      Set<String> blacklist, Set<String> allocatedNodes,
+      ApplicationAttemptId id, Map<String, RemoteNode> allNodes,
+      String userName, Map<Resource, List<Allocation>> allocations,
+      EnrichedResourceRequest enrichedAsk, int maxAllocations)
+      throws YarnException {
+    if (allNodes.size() == 0) {
+      LOG.info("No nodes currently available to " +
+          "allocate OPPORTUNISTIC containers.");
+      return;
+    }
+    ResourceRequest anyAsk = enrichedAsk.getRequest();
+    int toAllocate = anyAsk.getNumContainers()
+        - (allocations.isEmpty() ? 0 :
+        allocations.get(anyAsk.getCapability()).size());
+    toAllocate = Math.min(toAllocate,
+        appParams.getMaxAllocationsPerSchedulerKeyPerRound());
+    if (maxAllocations >= 0) {
+      toAllocate = Math.min(maxAllocations, toAllocate);
+    }
+    int numAllocated = 0;
+    // Node Candidates are selected as follows:
+    // * Node local candidates selected in loop == 0
+    // * Rack local candidates selected in loop == 1
+    // * From loop == 2 onwards, we revert to off switch allocations.
+    int loopIndex = OFF_SWITCH_LOOP;
+    if (enrichedAsk.getNodeLocations().size() > 0) {
+      loopIndex = NODE_LOCAL_LOOP;
+    }
+    while (numAllocated < toAllocate) {
+      Collection<RemoteNode> nodeCandidates =
+          findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes,
+              enrichedAsk);
+      for (RemoteNode rNode : nodeCandidates) {
+        String rNodeHost = rNode.getNodeId().getHost();
+        // Ignore black list
+        if (blacklist.contains(rNodeHost)) {
+          LOG.info("Nodes for scheduling has a blacklisted node" +
+              " [" + rNodeHost + "]..");
+          continue;
+        }
+        String location = ResourceRequest.ANY;
+        if (loopIndex == NODE_LOCAL_LOOP) {
+          if (enrichedAsk.getNodeLocations().contains(rNodeHost)) {
+            location = rNodeHost;
+          } else {
+            continue;
+          }
+        } else if (allocatedNodes.contains(rNodeHost)) {
+          LOG.info("Opportunistic container has already been allocated on {}.",
+              rNodeHost);
+          continue;
+        }
+        if (loopIndex == RACK_LOCAL_LOOP) {
+          if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
+            location = rNode.getRackName();
+          } else {
+            continue;
+          }
+        }
+        Container container = createContainer(rmIdentifier, appParams,
+            idCounter, id, userName, allocations, location,
+            anyAsk, rNode);
+        numAllocated++;
+        updateMetrics(loopIndex);
+        allocatedNodes.add(rNodeHost);
+        LOG.info("Allocated [" + container.getId() + "] as opportunistic at " +
+            "location [" + location + "]");
+        if (numAllocated >= toAllocate) {
+          break;
+        }
+      }
+      if (loopIndex == NODE_LOCAL_LOOP &&
+          enrichedAsk.getRackLocations().size() > 0) {
+        loopIndex = RACK_LOCAL_LOOP;
+      } else {
+        loopIndex++;
+      }
+      // Handle case where there are no nodes remaining after blacklist is
+      // considered.
+      if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) {
+        LOG.warn("Unable to allocate any opportunistic containers.");
+        break;
+      }
+    }
+  }
+
+
+
+  private void updateMetrics(int loopIndex) {
+    OpportunisticSchedulerMetrics metrics =
+        OpportunisticSchedulerMetrics.getMetrics();
+    if (loopIndex == NODE_LOCAL_LOOP) {
+      metrics.incrNodeLocalOppContainers();
+    } else if (loopIndex == RACK_LOCAL_LOOP) {
+      metrics.incrRackLocalOppContainers();
+    } else {
+      metrics.incrOffSwitchOppContainers();
+    }
+  }
+
+  private Collection<RemoteNode> findNodeCandidates(int loopIndex,
+      Map<String, RemoteNode> allNodes, Set<String> blackList,
+      Set<String> allocatedNodes, EnrichedResourceRequest enrichedRR) {
+    LinkedList<RemoteNode> retList = new LinkedList<>();
+    String partition = getRequestPartition(enrichedRR);
+    if (loopIndex > 1) {
+      for (RemoteNode remoteNode : allNodes.values()) {
+        if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) 
{
+          retList.add(remoteNode);
+        }
+      }
+      return retList;
+    } else {
+
+      int numContainers = enrichedRR.getRequest().getNumContainers();
+      while (numContainers > 0) {
+        if (loopIndex == 0) {
+          // Node local candidates
+          numContainers = collectNodeLocalCandidates(
+              allNodes, enrichedRR, retList, numContainers);
+        } else {
+          // Rack local candidates
+          numContainers =
+              collectRackLocalCandidates(allNodes, enrichedRR, retList,
+                  blackList, allocatedNodes, numContainers);
+        }
+        if (numContainers == enrichedRR.getRequest().getNumContainers()) {
+          // If there is no change in numContainers, then there is no point
+          // in looping again.
+          break;
+        }
+      }
+      return retList;
+    }
+  }
+
+  private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
+      EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
+      Set<String> blackList, Set<String> allocatedNodes, int numContainers) {
+    String partition = getRequestPartition(enrichedRR);
+    for (RemoteNode rNode : allNodes.values()) {
+      if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
+          enrichedRR.getRackLocations().contains(rNode.getRackName())) {
+        String rHost = rNode.getNodeId().getHost();
+        if (blackList.contains(rHost)) {
+          continue;
+        }
+        if (allocatedNodes.contains(rHost)) {
+          retList.addLast(rNode);
+        } else {
+          retList.addFirst(rNode);
+          numContainers--;
+        }
+      }
+      if (numContainers == 0) {
+        break;
+      }
+    }
+    return numContainers;
+  }
+
+  private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes,
+      EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
+      int numContainers) {
+    String partition = getRequestPartition(enrichedRR);
+    for (String nodeName : enrichedRR.getNodeLocations()) {
+      RemoteNode remoteNode = allNodes.get(nodeName);
+      if (remoteNode != null &&
+          StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {
+        retList.add(remoteNode);
+        numContainers--;
+      }
+      if (numContainers == 0) {
+        break;
+      }
+    }
+    return numContainers;
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 0ce1976..4a17a65 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.Time;
@@ -38,21 +37,15 @@ import 
org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 
 import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
-import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
 import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -61,16 +54,11 @@ import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * <p>
- * The OpportunisticContainerAllocator allocates containers on a given list of
- * nodes, after modifying the container sizes to respect the limits set by the
- * ResourceManager. It tries to distribute the containers as evenly as 
possible.
+ * Base abstract class for Opportunistic container allocations, that provides
+ * common functions required for Opportunistic container allocation.
  * </p>
  */
-public class OpportunisticContainerAllocator {
-
-  private static final int NODE_LOCAL_LOOP = 0;
-  private static final int RACK_LOCAL_LOOP = 1;
-  private static final int OFF_SWITCH_LOOP = 2;
+public abstract class OpportunisticContainerAllocator {
 
   private int maxAllocationsPerAMHeartbeat = -1;
 
@@ -212,9 +200,6 @@ public class OpportunisticContainerAllocator {
     }
   }
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(OpportunisticContainerAllocator.class);
-
   private static final ResourceCalculator RESOURCE_CALCULATOR =
       new DominantResourceCalculator();
 
@@ -238,26 +223,30 @@ public class OpportunisticContainerAllocator {
     }
   }
 
-  static class EnrichedResourceRequest {
+  /**
+   * This class encapsulates Resource Request and provides requests per
+   * node and rack.
+   */
+  public static class EnrichedResourceRequest {
     private final Map<String, AtomicInteger> nodeLocations = new HashMap<>();
     private final Map<String, AtomicInteger> rackLocations = new HashMap<>();
     private final ResourceRequest request;
     private final long timestamp;
 
-    EnrichedResourceRequest(ResourceRequest request) {
+    public EnrichedResourceRequest(ResourceRequest request) {
       this.request = request;
       timestamp = Time.monotonicNow();
     }
 
-    long getTimestamp() {
+    public long getTimestamp() {
       return timestamp;
     }
 
-    ResourceRequest getRequest() {
+    public ResourceRequest getRequest() {
       return request;
     }
 
-    void addLocation(String location, int count) {
+    public void addLocation(String location, int count) {
       Map<String, AtomicInteger> m = rackLocations;
       if (!location.startsWith("/")) {
         m = nodeLocations;
@@ -269,7 +258,7 @@ public class OpportunisticContainerAllocator {
       }
     }
 
-    void removeLocation(String location) {
+    public void removeLocation(String location) {
       Map<String, AtomicInteger> m = rackLocations;
       AtomicInteger count = m.get(location);
       if (count == null) {
@@ -284,14 +273,15 @@ public class OpportunisticContainerAllocator {
       }
     }
 
-    Set<String> getNodeLocations() {
+    public Set<String> getNodeLocations() {
       return nodeLocations.keySet();
     }
 
-    Set<String> getRackLocations() {
+    public Set<String> getRackLocations() {
       return rackLocations.keySet();
     }
   }
+
   /**
    * Create a new Opportunistic Container Allocator.
    * @param tokenSecretManager TokenSecretManager
@@ -320,6 +310,14 @@ public class OpportunisticContainerAllocator {
   }
 
   /**
+   * Get the Max Allocations per AM heartbeat.
+   * @return maxAllocationsPerAMHeartbeat.
+   */
+  public int getMaxAllocationsPerAMHeartbeat() {
+    return this.maxAllocationsPerAMHeartbeat;
+  }
+
+  /**
    * Allocate OPPORTUNISTIC containers.
    * @param blackList Resource BlackList Request
    * @param oppResourceReqs Opportunistic Resource Requests
@@ -330,72 +328,37 @@ public class OpportunisticContainerAllocator {
    * @return List of Containers.
    * @throws YarnException YarnException
    */
-  public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
+  public abstract List<Container> allocateContainers(
+      ResourceBlacklistRequest blackList,
       List<ResourceRequest> oppResourceReqs,
       ApplicationAttemptId applicationAttemptId,
       OpportunisticContainerContext opportContext, long rmIdentifier,
-      String appSubmitter) throws YarnException {
+      String appSubmitter) throws YarnException;
+
 
-    // Update black list.
+  protected void updateBlacklist(ResourceBlacklistRequest blackList,
+      OpportunisticContainerContext oppContext) {
     if (blackList != null) {
-      opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals());
-      opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions());
+      oppContext.getBlacklist().removeAll(blackList.getBlacklistRemovals());
+      oppContext.getBlacklist().addAll(blackList.getBlacklistAdditions());
     }
+  }
 
-    // Add OPPORTUNISTIC requests to the outstanding ones.
-    opportContext.addToOutstandingReqs(oppResourceReqs);
-    Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
-    Set<String> allocatedNodes = new HashSet<>();
-    List<Container> allocatedContainers = new ArrayList<>();
-
-    // Satisfy the outstanding OPPORTUNISTIC requests.
-    boolean continueLoop = true;
-    while (continueLoop) {
-      continueLoop = false;
-      List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>();
-      for (SchedulerRequestKey schedulerKey :
-          opportContext.getOutstandingOpReqs().descendingKeySet()) {
-        // Allocated containers :
-        //  Key = Requested Capability,
-        //  Value = List of Containers of given cap (the actual container size
-        //          might be different than what is requested, which is why
-        //          we need the requested capability (key) to match against
-        //          the outstanding reqs)
-        int remAllocs = -1;
-        if (maxAllocationsPerAMHeartbeat > 0) {
-          remAllocs =
-              maxAllocationsPerAMHeartbeat - allocatedContainers.size()
-                  - getTotalAllocations(allocations);
-          if (remAllocs <= 0) {
-            LOG.info("Not allocating more containers as we have reached max "
-                    + "allocations per AM heartbeat {}",
-                maxAllocationsPerAMHeartbeat);
-            break;
-          }
-        }
-        Map<Resource, List<Allocation>> allocation = allocate(
-            rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
-            appSubmitter, nodeBlackList, allocatedNodes, remAllocs);
-        if (allocation.size() > 0) {
-          allocations.add(allocation);
-          continueLoop = true;
-        }
-      }
-      for (Map<Resource, List<Allocation>> allocation : allocations) {
-        for (Map.Entry<Resource, List<Allocation>> e : allocation.entrySet()) {
-          opportContext.matchAllocationToOutstandingRequest(
-              e.getKey(), e.getValue());
-          for (Allocation alloc : e.getValue()) {
-            allocatedContainers.add(alloc.getContainer());
-          }
+  protected void matchAllocation(List<Map<Resource,
+      List<Allocation>>> allocations, List<Container> allocatedContainers,
+      OpportunisticContainerContext oppContext) {
+    for (Map<Resource, List<Allocation>> allocation : allocations) {
+      for (Map.Entry<Resource, List<Allocation>> e : allocation.entrySet()) {
+        oppContext.matchAllocationToOutstandingRequest(
+            e.getKey(), e.getValue());
+        for (Allocation alloc : e.getValue()) {
+          allocatedContainers.add(alloc.getContainer());
         }
       }
     }
-
-    return allocatedContainers;
   }
 
-  private int getTotalAllocations(
+  protected int getTotalAllocations(
       List<Map<Resource, List<Allocation>>> allocations) {
     int totalAllocs = 0;
     for (Map<Resource, List<Allocation>> allocation : allocations) {
@@ -406,223 +369,8 @@ public class OpportunisticContainerAllocator {
     return totalAllocs;
   }
 
-  private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
-      OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
-      ApplicationAttemptId appAttId, String userName, Set<String> blackList,
-      Set<String> allocatedNodes, int maxAllocations)
-      throws YarnException {
-    Map<Resource, List<Allocation>> containers = new HashMap<>();
-    for (EnrichedResourceRequest enrichedAsk :
-        appContext.getOutstandingOpReqs().get(schedKey).values()) {
-      int remainingAllocs = -1;
-      if (maxAllocations > 0) {
-        int totalAllocated = 0;
-        for (List<Allocation> allocs : containers.values()) {
-          totalAllocated += allocs.size();
-        }
-        remainingAllocs = maxAllocations - totalAllocated;
-        if (remainingAllocs <= 0) {
-          LOG.info("Not allocating more containers as max allocations per AM "
-                  + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat);
-          break;
-        }
-      }
-      allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
-          appContext.getContainerIdGenerator(), blackList, allocatedNodes,
-          appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk,
-          remainingAllocs);
-      ResourceRequest anyAsk = enrichedAsk.getRequest();
-      if (!containers.isEmpty()) {
-        LOG.info("Opportunistic allocation requested for [priority={}, "
-            + "allocationRequestId={}, num_containers={}, capability={}] "
-            + "allocated = {}", anyAsk.getPriority(),
-            anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(),
-            anyAsk.getCapability(), containers.keySet());
-      }
-    }
-    return containers;
-  }
-
-  private void allocateContainersInternal(long rmIdentifier,
-      AllocationParams appParams, ContainerIdGenerator idCounter,
-      Set<String> blacklist, Set<String> allocatedNodes,
-      ApplicationAttemptId id, Map<String, RemoteNode> allNodes,
-      String userName, Map<Resource, List<Allocation>> allocations,
-      EnrichedResourceRequest enrichedAsk, int maxAllocations)
-      throws YarnException {
-    if (allNodes.size() == 0) {
-      LOG.info("No nodes currently available to " +
-          "allocate OPPORTUNISTIC containers.");
-      return;
-    }
-    ResourceRequest anyAsk = enrichedAsk.getRequest();
-    int toAllocate = anyAsk.getNumContainers()
-        - (allocations.isEmpty() ? 0 :
-            allocations.get(anyAsk.getCapability()).size());
-    toAllocate = Math.min(toAllocate,
-        appParams.getMaxAllocationsPerSchedulerKeyPerRound());
-    if (maxAllocations >= 0) {
-      toAllocate = Math.min(maxAllocations, toAllocate);
-    }
-    int numAllocated = 0;
-    // Node Candidates are selected as follows:
-    // * Node local candidates selected in loop == 0
-    // * Rack local candidates selected in loop == 1
-    // * From loop == 2 onwards, we revert to off switch allocations.
-    int loopIndex = OFF_SWITCH_LOOP;
-    if (enrichedAsk.getNodeLocations().size() > 0) {
-      loopIndex = NODE_LOCAL_LOOP;
-    }
-    while (numAllocated < toAllocate) {
-      Collection<RemoteNode> nodeCandidates =
-          findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes,
-              enrichedAsk);
-      for (RemoteNode rNode : nodeCandidates) {
-        String rNodeHost = rNode.getNodeId().getHost();
-        // Ignore black list
-        if (blacklist.contains(rNodeHost)) {
-          LOG.info("Nodes for scheduling has a blacklisted node" +
-              " [" + rNodeHost + "]..");
-          continue;
-        }
-        String location = ResourceRequest.ANY;
-        if (loopIndex == NODE_LOCAL_LOOP) {
-          if (enrichedAsk.getNodeLocations().contains(rNodeHost)) {
-            location = rNodeHost;
-          } else {
-            continue;
-          }
-        } else if (allocatedNodes.contains(rNodeHost)) {
-          LOG.info("Opportunistic container has already been allocated on {}.",
-              rNodeHost);
-          continue;
-        }
-        if (loopIndex == RACK_LOCAL_LOOP) {
-          if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
-            location = rNode.getRackName();
-          } else {
-            continue;
-          }
-        }
-        Container container = createContainer(rmIdentifier, appParams,
-            idCounter, id, userName, allocations, location,
-            anyAsk, rNode);
-        numAllocated++;
-        updateMetrics(loopIndex);
-        allocatedNodes.add(rNodeHost);
-        LOG.info("Allocated [" + container.getId() + "] as opportunistic at " +
-            "location [" + location + "]");
-        if (numAllocated >= toAllocate) {
-          break;
-        }
-      }
-      if (loopIndex == NODE_LOCAL_LOOP &&
-          enrichedAsk.getRackLocations().size() > 0) {
-        loopIndex = RACK_LOCAL_LOOP;
-      } else {
-        loopIndex++;
-      }
-      // Handle case where there are no nodes remaining after blacklist is
-      // considered.
-      if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) {
-        LOG.warn("Unable to allocate any opportunistic containers.");
-        break;
-      }
-    }
-  }
-
-  private void updateMetrics(int loopIndex) {
-    OpportunisticSchedulerMetrics metrics =
-        OpportunisticSchedulerMetrics.getMetrics();
-    if (loopIndex == NODE_LOCAL_LOOP) {
-      metrics.incrNodeLocalOppContainers();
-    } else if (loopIndex == RACK_LOCAL_LOOP) {
-      metrics.incrRackLocalOppContainers();
-    } else {
-      metrics.incrOffSwitchOppContainers();
-    }
-  }
-
-  private Collection<RemoteNode> findNodeCandidates(int loopIndex,
-      Map<String, RemoteNode> allNodes, Set<String> blackList,
-      Set<String> allocatedNodes, EnrichedResourceRequest enrichedRR) {
-    LinkedList<RemoteNode> retList = new LinkedList<>();
-    String partition = getRequestPartition(enrichedRR);
-    if (loopIndex > 1) {
-      for (RemoteNode remoteNode : allNodes.values()) {
-        if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) 
{
-          retList.add(remoteNode);
-        }
-      }
-      return retList;
-    } else {
-
-      int numContainers = enrichedRR.getRequest().getNumContainers();
-      while (numContainers > 0) {
-        if (loopIndex == 0) {
-          // Node local candidates
-          numContainers = collectNodeLocalCandidates(
-              allNodes, enrichedRR, retList, numContainers);
-        } else {
-          // Rack local candidates
-          numContainers =
-              collectRackLocalCandidates(allNodes, enrichedRR, retList,
-                  blackList, allocatedNodes, numContainers);
-        }
-        if (numContainers == enrichedRR.getRequest().getNumContainers()) {
-          // If there is no change in numContainers, then there is no point
-          // in looping again.
-          break;
-        }
-      }
-      return retList;
-    }
-  }
-
-  private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
-      EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
-      Set<String> blackList, Set<String> allocatedNodes, int numContainers) {
-    String partition = getRequestPartition(enrichedRR);
-    for (RemoteNode rNode : allNodes.values()) {
-      if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
-          enrichedRR.getRackLocations().contains(rNode.getRackName())) {
-        String rHost = rNode.getNodeId().getHost();
-        if (blackList.contains(rHost)) {
-          continue;
-        }
-        if (allocatedNodes.contains(rHost)) {
-          retList.addLast(rNode);
-        } else {
-          retList.addFirst(rNode);
-          numContainers--;
-        }
-      }
-      if (numContainers == 0) {
-        break;
-      }
-    }
-    return numContainers;
-  }
-
-  private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes,
-      EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
-      int numContainers) {
-    String partition = getRequestPartition(enrichedRR);
-    for (String nodeName : enrichedRR.getNodeLocations()) {
-      RemoteNode remoteNode = allNodes.get(nodeName);
-      if (remoteNode != null &&
-          StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {
-        retList.add(remoteNode);
-        numContainers--;
-      }
-      if (numContainers == 0) {
-        break;
-      }
-    }
-    return numContainers;
-  }
-
-  private Container createContainer(long rmIdentifier,
+  @SuppressWarnings("checkstyle:parameternumber")
+  protected Container createContainer(long rmIdentifier,
       AllocationParams appParams, ContainerIdGenerator idCounter,
       ApplicationAttemptId id, String userName,
       Map<Resource, List<Allocation>> allocations, String location,
@@ -654,6 +402,7 @@ public class OpportunisticContainerAllocator {
         SchedulerRequestKey.create(rr), userName, node, cId, capability);
   }
 
+  @SuppressWarnings("checkstyle:parameternumber")
   private Container createContainer(long rmIdentifier, long tokenExpiry,
       SchedulerRequestKey schedulerKey, String userName, RemoteNode node,
       ContainerId cId, Resource capability) {
@@ -718,7 +467,7 @@ public class OpportunisticContainerAllocator {
     return partitionedRequests;
   }
 
-  private String getRequestPartition(EnrichedResourceRequest enrichedRR) {
+  protected String getRequestPartition(EnrichedResourceRequest enrichedRR) {
     String partition = enrichedRR.getRequest().getNodeLabelExpression();
     if (partition == null) {
       partition = CommonNodeLabelsManager.NO_LABEL;
@@ -726,7 +475,7 @@ public class OpportunisticContainerAllocator {
     return partition;
   }
 
-  private String getRemoteNodePartition(RemoteNode node) {
+  protected String getRemoteNodePartition(RemoteNode node) {
     String partition = node.getNodePartition();
     if (partition == null) {
       partition = CommonNodeLabelsManager.NO_LABEL;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
index 548ddad..6a91f41 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
@@ -98,7 +98,7 @@ public class TestOpportunisticContainerAllocator {
             return new byte[]{1, 2};
           }
         };
-    allocator = new OpportunisticContainerAllocator(secMan);
+    allocator = new DistributedOpportunisticContainerAllocator(secMan);
     oppCntxt = new OpportunisticContainerContext();
     oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1));
     oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1));
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index db3aaca..4bbae34 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -77,6 +77,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import 
org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
+import 
org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.state.MultiStateTransitionListener;
@@ -479,7 +480,7 @@ public class NodeManager extends CompositeService
         YarnConfiguration.
             DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
     ((NMContext) context).setQueueableContainerAllocator(
-        new OpportunisticContainerAllocator(
+        new DistributedOpportunisticContainerAllocator(
             context.getContainerTokenSecretManager(),
             maxAllocationsPerAMHeartbeat));
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
index dee2a20..5a0715e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
@@ -44,6 +44,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 
+import 
org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
@@ -232,7 +233,8 @@ public class TestDistributedScheduler {
     };
     nmContainerTokenSecretManager.setMasterKey(mKey);
     OpportunisticContainerAllocator containerAllocator =
-        new OpportunisticContainerAllocator(nmContainerTokenSecretManager);
+        new DistributedOpportunisticContainerAllocator(
+            nmContainerTokenSecretManager);
 
     NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
         new NMTokenSecretManagerInNM();
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index a360ed2..4475caf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
+import 
org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -233,9 +234,10 @@ public class OpportunisticContainerAllocatorAMService
         YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
         YarnConfiguration.
             DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
-    this.oppContainerAllocator = new OpportunisticContainerAllocator(
-        rmContext.getContainerTokenSecretManager(),
-        maxAllocationsPerAMHeartbeat);
+    this.oppContainerAllocator =
+        new DistributedOpportunisticContainerAllocator(
+            rmContext.getContainerTokenSecretManager(),
+            maxAllocationsPerAMHeartbeat);
     this.k = rmContext.getYarnConfiguration().getInt(
         YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
         YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to