This is an automated email from the ASF dual-hosted git repository.

bipinprasad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new f1716be  [STORM-3739] Scheduling should sort numa zones by host groups 
(#3379)
f1716be is described below

commit f1716be3d630c44c93787500af9cee427652f548
Author: Bipin Prasad <[email protected]>
AuthorDate: Mon Mar 15 12:18:35 2021 -0500

    [STORM-3739] Scheduling should sort numa zones by host groups (#3379)
    
    * [STORM-3739] Scheduling should sort numa zones by host groups
    
    * [YSTORM-3739] Ignore blacklisted hosts and corresponding 
nodes/supervisors.
    
    * [STORM-3739] Javadoc fix.
    
    * [STORM-3739] Add check for null hostname, remove unused code, set 
totalResources.
    
    * [STORM-3739] Remove unused methods.
    
    * [STORM-3739] Sort hosts comparing average resources before comparing min 
resources.
    
    * [STORM-3739] Add dead node check.
    
    * [YSTORM-3739] NodeSortType changes and revert.
    
    * [STORM-3739] Removed unused/commented code.
    
    Co-authored-by: Bipin Prasad <[email protected]>
---
 .../apache/storm/scheduler/ISchedulingState.java   |   42 +
 .../apache/storm/scheduler/resource/RasNodes.java  |   12 +
 .../normalization/NormalizedResources.java         |   21 +-
 .../scheduling/BaseResourceAwareStrategy.java      |   52 +-
 .../strategies/scheduling/ObjectResourcesItem.java |    9 +-
 .../strategies/scheduling/sorter/INodeSorter.java  |   12 +-
 .../strategies/scheduling/sorter/NodeSorter.java   |  196 ++--
 ...odeSorter.java => NodeSorterHostProximity.java} |  381 ++++---
 .../TestUtilsForResourceAwareScheduler.java        |   54 +-
 .../scheduling/TestConstraintSolverStrategy.java   |   16 +-
 .../TestDefaultResourceAwareStrategy.java          |   36 +-
 .../sorter/TestNodeSorterHostProximity.java        | 1036 ++++++++++++++++++++
 12 files changed, 1567 insertions(+), 300 deletions(-)

diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index c584351..cc3561e 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -19,12 +19,15 @@
 package org.apache.storm.scheduler;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.storm.daemon.nimbus.TopologyResources;
 import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
 import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 
@@ -283,6 +286,18 @@ public interface ISchedulingState {
     Map<String, List<String>> getNetworkTopography();
 
     /**
+     * Get host -> rack map - the inverse of networkTopography.
+     */
+    default Map<String, String> getHostToRack() {
+        Map<String, String> ret = new HashMap<>();
+        Map<String, List<String>> networkTopography = getNetworkTopography();
+        if (networkTopography != null) {
+            networkTopography.forEach((rack, hosts) -> hosts.forEach(host -> 
ret.put(host, rack)));
+        }
+        return ret;
+    }
+
+    /**
      * Get all topology scheduler statuses.
      */
     Map<String, String> getStatusMap();
@@ -339,4 +354,31 @@ public interface ISchedulingState {
      * Get the nimbus configuration.
      */
     Map<String, Object> getConf();
+
+    /**
+     * Determine the list of racks on which topologyIds have been assigned. 
Note that the returned set
+     * may contain {@link DNSToSwitchMapping#DEFAULT_RACK} if {@link 
#getHostToRack()} is null or
+     * does not contain the assigned host.
+     *
+     * @param topologyIds for which assignments are examined.
+     * @return set of racks on which assignments have been made.
+     */
+    default Set<String> getAssignedRacks(String... topologyIds) {
+        Set<String> ret = new HashSet<>();
+        Map<String, String> networkTopographyInverted = getHostToRack();
+        for (String topologyId: topologyIds) {
+            SchedulerAssignment assignment = getAssignmentById(topologyId);
+            if (assignment == null) {
+                continue;
+            }
+            for (WorkerSlot slot : assignment.getSlots()) {
+                String nodeId = slot.getNodeId();
+                SupervisorDetails supervisorDetails = 
getSupervisorById(nodeId);
+                String hostId = supervisorDetails.getHost();
+                String rackId = networkTopographyInverted.getOrDefault(hostId, 
DNSToSwitchMapping.DEFAULT_RACK);
+                ret.add(rackId);
+            }
+        }
+        return ret;
+    }
 }
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
index 427fb56..645c4e0 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
@@ -156,6 +156,18 @@ public class RasNodes {
         return hostnameToNodes;
     }
 
+    /**
+     * Get a map from RasNodeId to HostName.
+     *
+     * @return map of nodeId to hostname
+     */
+    public Map<String, String> getNodeIdToHostname() {
+        Map<String, String> nodeIdToHostname = new HashMap<>();
+        nodeMap.values()
+                .forEach(node -> nodeIdToHostname.put(node.getId(), 
node.getHostname()));
+        return nodeIdToHostname;
+    }
+
     @Override
     public String toString() {
         StringBuilder ret = new StringBuilder();
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index ea77b1a..de097a6 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -247,10 +247,16 @@ public class NormalizedResources {
         return null;
     }
 
+    private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, 
double totalMemoryMb, double usedMemoryMb, String info) {
+        throw new IllegalArgumentException(String.format("The used resources 
must be a subset of the total resources."
+                        + " Used: '%s', Total: '%s', Used Mem: '%f', Total 
Mem: '%f', additionalInfo: '%s'",
+                used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, 
totalMemoryMb, info));
+    }
+
     private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, 
double totalMemoryMb, double usedMemoryMb) {
         throw new IllegalArgumentException(String.format("The used resources 
must be a subset of the total resources."
-                                                         + " Used: '%s', 
Total: '%s', Used Mem: '%f', Total Mem: '%f'",
-                                                         
used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
+                        + " Used: '%s', Total: '%s', Used Mem: '%f', Total 
Mem: '%f'",
+                used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, 
totalMemoryMb));
     }
 
     /**
@@ -375,7 +381,8 @@ public class NormalizedResources {
                 return 0;
             }
             if (used.otherResources[i] > otherResources[i]) {
-                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, 
usedMemoryMb);
+                String info = String.format("%s, %f > %f", 
getResourceNameForResourceIndex(i), used.otherResources[i], otherResources[i]);
+                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, 
usedMemoryMb, info);
             }
             min = Math.min(min, used.otherResources[i] / otherResources[i]);
         }
@@ -384,14 +391,16 @@ public class NormalizedResources {
 
     /**
      * If a node or rack has a kind of resource not in a request, make that 
resource negative so when sorting that node or rack will
-     * be less likely to be selected.
+     * be less likely to be selected. If the resource is in the request, make 
that resource positive.
      * @param request the requested resources.
      */
     public void updateForRareResourceAffinity(NormalizedResources request) {
         int length = Math.min(this.otherResources.length, 
request.otherResources.length);
         for (int i = 0; i < length; i++) {
-            if (request.getResourceAt(i) == 0.0) {
-                this.otherResources[i] = -1 * this.otherResources[i];
+            if (request.getResourceAt(i) == 0.0 && this.otherResources[i] > 
0.0) {
+                this.otherResources[i] = -this.otherResources[i]; // make 
negative
+            } else if (request.getResourceAt(i) > 0.0 && 
this.otherResources[i] < 0.0) {
+                this.otherResources[i] = -this.otherResources[i]; // make 
positive
             }
         }
     }
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 6f49083..4bc4d0b 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -44,6 +44,7 @@ import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSort
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorter;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 import org.slf4j.Logger;
@@ -58,9 +59,25 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
      * Refer to {@link NodeSorter#NodeSorter(Cluster, TopologyDetails, 
NodeSortType)} for more details.
      */
     public enum NodeSortType {
-        GENERIC_RAS, // for deprecation, Used by 
GenericResourceAwareStrategyOld
-        DEFAULT_RAS, // for deprecation, Used by 
DefaultResourceAwareStrategyOld
-        COMMON       // new and only node sorting type going forward
+        /**
+         * Generic Resource Aware Strategy sorting type.
+         * @deprecated used by GenericResourceAwareStrategyOld only. Use {link 
#COMMON} instead.
+         */
+        @Deprecated
+        GENERIC_RAS,
+
+        /**
+         * Default Resource Aware Strategy sorting type.
+         * @deprecated used by DefaultResourceAwareStrategyOld only. Use {link 
#COMMON} instead.
+         */
+        @Deprecated
+        DEFAULT_RAS,
+
+        /**
+         * New and only node sorting type going forward.
+         * {@link NodeSorterHostProximity#NodeSorterHostProximity(Cluster, 
TopologyDetails)} for more details
+         */
+        COMMON,
     }
 
     // instance variables from class instantiation
@@ -149,7 +166,8 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
         List<ExecutorDetails> orderedExecutors = 
execSorter.sortExecutors(unassignedExecutors);
         Iterable<String> sortedNodes = null;
         if (!this.sortNodesForEachExecutor) {
-            sortedNodes = nodeSorter.sortAllNodes(null);
+            nodeSorter.prepare(null);
+            sortedNodes = nodeSorter.sortAllNodes();
         }
         return scheduleExecutorsOnNodes(orderedExecutors, sortedNodes);
     }
@@ -189,7 +207,7 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
         LOG.debug("The max state search that will be used by topology {} is 
{}", topologyDetails.getId(), maxStateSearch);
 
         searcherState = createSearcherState();
-        setNodeSorter(new NodeSorter(cluster, topologyDetails, nodeSortType));
+        setNodeSorter(new NodeSorterHostProximity(cluster, topologyDetails, 
nodeSortType));
         setExecSorter(orderExecutorsByProximity
                 ? new ExecSorterByProximity(topologyDetails)
                 : new ExecSorterByConnectionCount(topologyDetails));
@@ -413,7 +431,8 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
         for (int i = 0; i < maxExecCnt ; i++) {
             progressIdxForExec[i] = -1;
         }
-        LOG.info("scheduleExecutorsOnNodes: will assign {} executors for topo 
{}", maxExecCnt, topoName);
+        LOG.debug("scheduleExecutorsOnNodes: will assign {} executors for topo 
{}, sortNodesForEachExecutor={}",
+                maxExecCnt, topoName, sortNodesForEachExecutor);
 
         OUTERMOST_LOOP:
         for (int loopCnt = 0 ; true ; loopCnt++) {
@@ -450,7 +469,8 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
             String comp = execToComp.get(exec);
             if (sortedNodesIter == null || (this.sortNodesForEachExecutor && 
searcherState.isExecCompDifferentFromPrior())) {
                 progressIdx = -1;
-                sortedNodesIter = nodeSorter.sortAllNodes(exec);
+                nodeSorter.prepare(exec);
+                sortedNodesIter = nodeSorter.sortAllNodes();
             }
 
             for (String nodeId : sortedNodesIter) {
@@ -470,8 +490,8 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
                     if (numBoundAckerAssigned == -1) {
                         // This only happens when trying to assign bound 
ackers to the worker slot and failed.
                         // Free the entire worker slot and put those bound 
ackers back to unassigned list
-                        LOG.debug("Failed to assign bound acker for exec: {} 
of topo: {} to worker: {}.  Backtracking.",
-                            exec, topoName, workerSlot);
+                        LOG.debug("Failed to assign bound acker for exec={}, 
comp={}, topo: {} to worker: {}.  Backtracking.",
+                            exec, comp, topoName, workerSlot);
                         searcherState.freeWorkerSlotWithBoundAckers(node, 
workerSlot);
                         continue;
                     }
@@ -481,9 +501,13 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
                         // and this is not the first exec to this workerSlot.
                         // So just go to next workerSlot and don't free the 
worker.
                         if (numBoundAckerAssigned > 0) {
-                            LOG.debug("Failed to assign exec: {} of topo: {} 
with bound ackers to worker: {}.  Backtracking.",
-                                exec, topoName, workerSlot);
+                            LOG.debug("Failed to assign exec={}, comp={}, 
topo={} with bound ackers to worker: {}.  Backtracking.",
+                                exec, comp, topoName, workerSlot);
                             searcherState.freeWorkerSlotWithBoundAckers(node, 
workerSlot);
+                        } else {
+                            LOG.debug("Failed to assign exec={}, comp={}, 
topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
+                                exec, comp, topoName, workerSlot,
+                                node.getId(), node.getAvailableCpuResources(), 
node.getAvailableMemoryResources());
                         }
                         continue;
                     }
@@ -506,8 +530,10 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
                     searcherState = searcherState.nextExecutor();
                     nodeForExec[execIndex] = node;
                     workerSlotForExec[execIndex] = workerSlot;
-                    LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, 
comp={} to node={}, slot-ordinal={} at loopCnt={}, topo={}",
-                            execIndex, comp, nodeId, progressIdx, loopCnt, 
topoName);
+                    LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, 
comp={} to node={}/cpu={}/mem={}, "
+                            + "slot-ordinal={} at loopCnt={}, topo={}",
+                        execIndex, comp, nodeId, 
node.getAvailableCpuResources(), node.getAvailableMemoryResources(),
+                        progressIdx, loopCnt, topoName);
                     continue OUTERMOST_LOOP;
                 }
             }
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
index 5532d78..7b4f7d2 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
@@ -30,7 +30,7 @@ public class ObjectResourcesItem {
 
     /**
      * Amongst all {@link #availableResources}, this is the minimum ratio of 
resource to the total available in group.
-     * Note that nodes are grouped into racks. And racks are grouped under the 
cluster.
+     * Note that nodes are grouped into hosts. Hosts into racks. And racks are 
grouped under the cluster.
      *
      * <p>
      * An example of this calculation is in
@@ -43,7 +43,7 @@ public class ObjectResourcesItem {
 
     /**
      * Amongst all {@link #availableResources}, this is the average ratio of 
resource to the total available in group.
-     * Note that nodes are grouped into racks. And racks are grouped under the 
cluster.
+     * Note that nodes are grouped into hosts, hosts into racks, and racks are 
grouped under the cluster.
      *
      * <p>
      * An example of this calculation is in
@@ -73,6 +73,11 @@ public class ObjectResourcesItem {
         this.avgResourcePercent = avgResourcePercent;
     }
 
+    public void add(ObjectResourcesItem other) {
+        this.availableResources.add(other.availableResources);
+        this.totalResources.add(other.totalResources);
+    }
+
     @Override
     public String toString() {
         return this.id;
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
index ceda82e..f2a4e8d 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
@@ -18,14 +18,20 @@
 
 package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
 
-import java.util.TreeSet;
 import org.apache.storm.scheduler.ExecutorDetails;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
 
 
 public interface INodeSorter {
 
-    TreeSet<ObjectResourcesItem> sortRacks(ExecutorDetails exec);
+    /**
+     * Prepare for node sorting. This method must be called before {@link 
#getSortedRacks()} and {@link #sortAllNodes()}.
+     *
+     * @param exec optional, may be null.
+     */
+    void prepare(ExecutorDetails exec);
 
-    Iterable<String> sortAllNodes(ExecutorDetails exec);
+    Iterable<ObjectResourcesItem> getSortedRacks();
+
+    Iterable<String> sortAllNodes();
 }
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
index 85a9015..7ff4b05 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
@@ -21,6 +21,7 @@ package 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -28,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -70,6 +70,9 @@ public class NodeSorter implements INodeSorter {
     protected List<String> favoredNodeIds;
     protected List<String> unFavoredNodeIds;
 
+    // Updated in prepare method
+    ExecutorDetails exec;
+
     /**
      * Initialize for the default implementation node sorting.
      *
@@ -93,21 +96,18 @@ public class NodeSorter implements INodeSorter {
 
         // from Cluster
         networkTopography = cluster.getNetworkTopography();
-        Map<String, String> hostToRack = new HashMap<>();
-        for (Map.Entry<String, List<String>> entry : 
networkTopography.entrySet()) {
-            String rackId = entry.getKey();
-            for (String hostName: entry.getValue()) {
-                hostToRack.put(hostName, rackId);
-            }
-        }
+        Map<String, String> hostToRack = cluster.getHostToRack();
         RasNodes nodes = new RasNodes(cluster);
         for (RasNode node: nodes.getNodes()) {
             String superId = node.getId();
             String hostName = node.getHostname();
+            if (!node.isAlive() || hostName == null) {
+                continue;
+            }
             String rackId = hostToRack.getOrDefault(hostName, 
DNSToSwitchMapping.DEFAULT_RACK);
             superIdToRack.put(superId, rackId);
             hostnameToNodes.computeIfAbsent(hostName, (hn) -> new 
ArrayList<>()).add(node);
-            rackIdToNodes.computeIfAbsent(rackId, (hn) -> new 
ArrayList<>()).add(node);
+            rackIdToNodes.computeIfAbsent(rackId, (rid) -> new 
ArrayList<>()).add(node);
         }
         this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
 
@@ -122,16 +122,21 @@ public class NodeSorter implements INodeSorter {
         unFavoredNodeIds.removeAll(favoredNodeIds);
     }
 
+    @Override
+    public void prepare(ExecutorDetails exec) {
+        this.exec = exec;
+    }
+
     /**
-     * Scheduling uses {@link #sortAllNodes(ExecutorDetails)} which eventually
-     * calls this method whose behavior can altered by setting {@link 
#nodeSortType}.
+     * Scheduling uses {@link #sortAllNodes()} which eventually
+     * calls this method whose behavior can be altered by setting {@link 
#nodeSortType}.
      *
      * @param resourcesSummary     contains all individual {@link 
ObjectResourcesItem} as well as cumulative stats
      * @param exec                 executor for which the sorting is done
      * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
      * @return a sorted list of {@link ObjectResourcesItem}
      */
-    protected TreeSet<ObjectResourcesItem> sortObjectResources(
+    protected List<ObjectResourcesItem> sortObjectResources(
             ObjectResourcesSummary resourcesSummary, ExecutorDetails exec, 
ExistingScheduleFunc existingScheduleFunc) {
         switch (nodeSortType) {
             case DEFAULT_RAS:
@@ -180,7 +185,7 @@ public class NodeSorter implements INodeSorter {
      * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
      * @return a sorted list of ObjectResources
      */
-    private TreeSet<ObjectResourcesItem> sortObjectResourcesCommon(
+    private List<ObjectResourcesItem> sortObjectResourcesCommon(
             final ObjectResourcesSummary allResources, final ExecutorDetails 
exec,
             final ExistingScheduleFunc existingScheduleFunc) {
         // Copy and modify allResources
@@ -203,34 +208,31 @@ public class NodeSorter implements INodeSorter {
         );
 
         // Use the following comparator to return a sorted set
-        TreeSet<ObjectResourcesItem> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
-                        return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
-                        return 1;
-                    } else {
-                        double o1Avg = o1.avgResourcePercent;
-                        double o2Avg = o2.avgResourcePercent;
-
-                        if (o1Avg > o2Avg) {
-                            return -1;
-                        } else if (o1Avg < o2Avg) {
-                            return 1;
-                        } else {
-                            if (o1.minResourcePercent > o2.minResourcePercent) 
{
-                                return -1;
-                            } else if (o1.minResourcePercent < 
o2.minResourcePercent) {
-                                return 1;
-                            } else {
-                                return o1.id.compareTo(o2.id);
-                            }
-                        }
-                    }
-                });
+        List<ObjectResourcesItem> sortedObjectResources = new ArrayList();
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            double o1Avg = o1.avgResourcePercent;
+            double o2Avg = o2.avgResourcePercent;
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
         
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+        sortedObjectResources.sort(comparator);
         LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
         return sortedObjectResources;
     }
@@ -258,38 +260,35 @@ public class NodeSorter implements INodeSorter {
      * @return a sorted list of ObjectResources
      */
     @Deprecated
-    private TreeSet<ObjectResourcesItem> sortObjectResourcesGeneric(
+    private List<ObjectResourcesItem> sortObjectResourcesGeneric(
             final ObjectResourcesSummary allResources, ExecutorDetails exec,
             final ExistingScheduleFunc existingScheduleFunc) {
         ObjectResourcesSummary affinityBasedAllResources = new 
ObjectResourcesSummary(allResources);
         NormalizedResourceRequest requestedResources = 
topologyDetails.getTotalResources(exec);
-        for (ObjectResourcesItem objectResources : 
affinityBasedAllResources.getObjectResources()) {
-            
objectResources.availableResources.updateForRareResourceAffinity(requestedResources);
-        }
+        affinityBasedAllResources.getObjectResources()
+                .forEach(x -> 
x.availableResources.updateForRareResourceAffinity(requestedResources));
         final NormalizedResourceOffer availableResourcesOverall = 
allResources.getAvailableResourcesOverall();
 
-        TreeSet<ObjectResourcesItem> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
-                        return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
-                        return 1;
-                    } else {
-                        double o1Avg = 
availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
-                        double o2Avg = 
availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
-
-                        if (o1Avg > o2Avg) {
-                            return -1;
-                        } else if (o1Avg < o2Avg) {
-                            return 1;
-                        } else {
-                            return o1.id.compareTo(o2.id);
-                        }
-                    }
-                });
+        List<ObjectResourcesItem> sortedObjectResources = new ArrayList<>();
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            double o1Avg = 
availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+            double o2Avg = 
availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
         
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+        sortedObjectResources.sort(comparator);
         LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
         return sortedObjectResources;
     }
@@ -313,7 +312,7 @@ public class NodeSorter implements INodeSorter {
      * @return a sorted list of ObjectResources
      */
     @Deprecated
-    private TreeSet<ObjectResourcesItem> sortObjectResourcesDefault(
+    private List<ObjectResourcesItem> sortObjectResourcesDefault(
             final ObjectResourcesSummary allResources,
             final ExistingScheduleFunc existingScheduleFunc) {
 
@@ -328,32 +327,30 @@ public class NodeSorter implements INodeSorter {
                     
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
         }
 
-        TreeSet<ObjectResourcesItem> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
-                        return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
-                        return 1;
-                    } else {
-                        if (o1.minResourcePercent > o2.minResourcePercent) {
-                            return -1;
-                        } else if (o1.minResourcePercent < 
o2.minResourcePercent) {
-                            return 1;
-                        } else {
-                            double diff = o1.avgResourcePercent - 
o2.avgResourcePercent;
-                            if (diff > 0.0) {
-                                return -1;
-                            } else if (diff < 0.0) {
-                                return 1;
-                            } else {
-                                return o1.id.compareTo(o2.id);
-                            }
-                        }
-                    }
-                });
+        List<ObjectResourcesItem> sortedObjectResources = new ArrayList<>();
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+            if (diff > 0.0) {
+                return -1;
+            } else if (diff < 0.0) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
         sortedObjectResources.addAll(allResources.getObjectResources());
+        sortedObjectResources.sort(comparator);
         LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
         return sortedObjectResources;
     }
@@ -375,7 +372,7 @@ public class NodeSorter implements INodeSorter {
      * @param rackId     the rack id availNodes are a part of
      * @return a sorted list of nodes.
      */
-    private TreeSet<ObjectResourcesItem> sortNodes(
+    private List<ObjectResourcesItem> sortNodes(
             List<RasNode> availRasNodes, ExecutorDetails exec, String rackId,
             Map<String, AtomicInteger> scheduledCount) {
         ObjectResourcesSummary rackResourcesSummary = new 
ObjectResourcesSummary("RACK");
@@ -428,7 +425,7 @@ public class NodeSorter implements INodeSorter {
         private final Iterator<String> post;
         private final Set<String> skip;
 
-        LazyNodeSortingIterator(LazyNodeSorting parent, 
TreeSet<ObjectResourcesItem> sortedRacks) {
+        LazyNodeSortingIterator(LazyNodeSorting parent, 
List<ObjectResourcesItem> sortedRacks) {
             this.parent = parent;
             rackIterator = sortedRacks.iterator();
             pre = favoredNodeIds.iterator();
@@ -495,8 +492,8 @@ public class NodeSorter implements INodeSorter {
 
     private class LazyNodeSorting implements Iterable<String> {
         private final Map<String, AtomicInteger> perNodeScheduledCount = new 
HashMap<>();
-        private final TreeSet<ObjectResourcesItem> sortedRacks;
-        private final Map<String, TreeSet<ObjectResourcesItem>> cachedNodes = 
new HashMap<>();
+        private final List<ObjectResourcesItem> sortedRacks;
+        private final Map<String, List<ObjectResourcesItem>> cachedNodes = new 
HashMap<>();
         private final ExecutorDetails exec;
         private final Set<String> skippedNodeIds = new HashSet<>();
 
@@ -516,10 +513,10 @@ public class NodeSorter implements INodeSorter {
                         .getAndAdd(entry.getValue().size());
                 }
             }
-            sortedRacks = sortRacks(exec);
+            sortedRacks = getSortedRacks();
         }
 
-        private TreeSet<ObjectResourcesItem> getSortedNodesFor(String rackId) {
+        private List<ObjectResourcesItem> getSortedNodesFor(String rackId) {
             return cachedNodes.computeIfAbsent(rackId,
                 (rid) -> sortNodes(rackIdToNodes.getOrDefault(rid, 
Collections.emptyList()), exec, rid, perNodeScheduledCount));
         }
@@ -531,7 +528,7 @@ public class NodeSorter implements INodeSorter {
     }
 
     @Override
-    public Iterable<String> sortAllNodes(ExecutorDetails exec) {
+    public Iterable<String> sortAllNodes() {
         return new LazyNodeSorting(exec);
     }
 
@@ -590,8 +587,7 @@ public class NodeSorter implements INodeSorter {
      *
      * @return a sorted list of racks
      */
-    @Override
-    public TreeSet<ObjectResourcesItem> sortRacks(ExecutorDetails exec) {
+    public List<ObjectResourcesItem> getSortedRacks() {
 
         final ObjectResourcesSummary clusterResourcesSummary = 
createClusterSummarizedResources();
         final Map<String, AtomicInteger> scheduledCount = 
getScheduledExecCntByRackId();
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
similarity index 66%
copy from 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
copy to 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
index 85a9015..64f9f08 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
@@ -21,6 +21,7 @@ package 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -32,7 +33,6 @@ import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-
 import org.apache.storm.Config;
 import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.scheduler.Cluster;
@@ -47,11 +47,12 @@ import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceReque
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class NodeSorter implements INodeSorter {
-    private static final Logger LOG = 
LoggerFactory.getLogger(NodeSorter.class);
+public class NodeSorterHostProximity implements INodeSorter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NodeSorterHostProximity.class);
 
     // instance variables from class instantiation
     protected final BaseResourceAwareStrategy.NodeSortType nodeSortType;
@@ -60,56 +61,60 @@ public class NodeSorter implements INodeSorter {
     protected TopologyDetails topologyDetails;
 
     // Instance variables derived from Cluster.
-    private final Map<String, List<String>> networkTopography;
     private final Map<String, String> superIdToRack = new HashMap<>();
     private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
-    private final Map<String, List<RasNode>> rackIdToNodes = new HashMap<>();
+    private final Map<String, String> nodeIdToHostname = new HashMap<>();
+    private final Map<String, Set<String>> rackIdToHosts = new HashMap<>();
     protected List<String> greyListedSupervisorIds;
 
     // Instance variables from Cluster and TopologyDetails.
     protected List<String> favoredNodeIds;
     protected List<String> unFavoredNodeIds;
 
+    // Updated in prepare method
+    ExecutorDetails exec;
+
+    public NodeSorterHostProximity(Cluster cluster, TopologyDetails 
topologyDetails) {
+        this(cluster, topologyDetails, 
BaseResourceAwareStrategy.NodeSortType.COMMON);
+    }
+
     /**
      * Initialize for the default implementation node sorting.
      *
      * <p>
      *  <li>{@link BaseResourceAwareStrategy.NodeSortType#GENERIC_RAS} sorting 
implemented in
-     *  {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, 
ExecutorDetails, NodeSorter.ExistingScheduleFunc)}</li>
+     *  {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, 
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
      *  <li>{@link BaseResourceAwareStrategy.NodeSortType#DEFAULT_RAS} sorting 
implemented in
-     *  {@link #sortObjectResourcesDefault(ObjectResourcesSummary, 
NodeSorter.ExistingScheduleFunc)}</li>
+     *  {@link #sortObjectResourcesDefault(ObjectResourcesSummary, 
NodeSorterHostProximity.ExistingScheduleFunc)}</li>
      *  <li>{@link BaseResourceAwareStrategy.NodeSortType#COMMON} sorting 
implemented in
-     *  {@link #sortObjectResourcesCommon(ObjectResourcesSummary, 
ExecutorDetails, NodeSorter.ExistingScheduleFunc)}</li>
+     *  {@link #sortObjectResourcesCommon(ObjectResourcesSummary, 
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
      * </p>
      *
      * @param cluster for which nodes will be sorted.
      * @param topologyDetails the topology to sort for.
      * @param nodeSortType type of sorting to be applied to object resource 
collection {@link BaseResourceAwareStrategy.NodeSortType}.
      */
-    public NodeSorter(Cluster cluster, TopologyDetails topologyDetails, 
BaseResourceAwareStrategy.NodeSortType nodeSortType) {
+    public NodeSorterHostProximity(Cluster cluster, TopologyDetails 
topologyDetails, BaseResourceAwareStrategy.NodeSortType nodeSortType) {
         this.cluster = cluster;
         this.topologyDetails = topologyDetails;
         this.nodeSortType = nodeSortType;
 
         // from Cluster
-        networkTopography = cluster.getNetworkTopography();
-        Map<String, String> hostToRack = new HashMap<>();
-        for (Map.Entry<String, List<String>> entry : 
networkTopography.entrySet()) {
-            String rackId = entry.getKey();
-            for (String hostName: entry.getValue()) {
-                hostToRack.put(hostName, rackId);
-            }
-        }
+        greyListedSupervisorIds = cluster.getGreyListedSupervisors();
+        Map<String, String> hostToRack = cluster.getHostToRack();
         RasNodes nodes = new RasNodes(cluster);
         for (RasNode node: nodes.getNodes()) {
             String superId = node.getId();
             String hostName = node.getHostname();
+            if (!node.isAlive() || hostName == null) {
+                continue;
+            }
             String rackId = hostToRack.getOrDefault(hostName, 
DNSToSwitchMapping.DEFAULT_RACK);
             superIdToRack.put(superId, rackId);
             hostnameToNodes.computeIfAbsent(hostName, (hn) -> new 
ArrayList<>()).add(node);
-            rackIdToNodes.computeIfAbsent(rackId, (hn) -> new 
ArrayList<>()).add(node);
+            nodeIdToHostname.put(superId, hostName);
+            rackIdToHosts.computeIfAbsent(rackId, r -> new 
HashSet<>()).add(hostName);
         }
-        this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
 
         // from TopologyDetails
         Map<String, Object> topoConf = topologyDetails.getConf();
@@ -122,16 +127,26 @@ public class NodeSorter implements INodeSorter {
         unFavoredNodeIds.removeAll(favoredNodeIds);
     }
 
+    @VisibleForTesting
+    public Map<String, Set<String>> getRackIdToHosts() {
+        return rackIdToHosts;
+    }
+
+    @Override
+    public void prepare(ExecutorDetails exec) {
+        this.exec = exec;
+    }
+
     /**
-     * Scheduling uses {@link #sortAllNodes(ExecutorDetails)} which eventually
-     * calls this method whose behavior can altered by setting {@link 
#nodeSortType}.
+     * Scheduling uses {@link #sortAllNodes()} which eventually
+     * calls this method whose behavior can be altered by setting {@link 
#nodeSortType}.
      *
      * @param resourcesSummary     contains all individual {@link 
ObjectResourcesItem} as well as cumulative stats
      * @param exec                 executor for which the sorting is done
      * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
-     * @return a sorted list of {@link ObjectResourcesItem}
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
      */
-    protected TreeSet<ObjectResourcesItem> sortObjectResources(
+    protected Iterable<ObjectResourcesItem> sortObjectResources(
             ObjectResourcesSummary resourcesSummary, ExecutorDetails exec, 
ExistingScheduleFunc existingScheduleFunc) {
         switch (nodeSortType) {
             case DEFAULT_RAS:
@@ -178,9 +193,9 @@ public class NodeSorter implements INodeSorter {
      * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
      * @param exec                 executor for which the sorting is done
      * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
-     * @return a sorted list of ObjectResources
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
      */
-    private TreeSet<ObjectResourcesItem> sortObjectResourcesCommon(
+    private Iterable<ObjectResourcesItem> sortObjectResourcesCommon(
             final ObjectResourcesSummary allResources, final ExecutorDetails 
exec,
             final ExistingScheduleFunc existingScheduleFunc) {
         // Copy and modify allResources
@@ -189,11 +204,11 @@ public class NodeSorter implements INodeSorter {
         final NormalizedResourceRequest requestedResources = (exec != null) ? 
topologyDetails.getTotalResources(exec) : null;
         affinityBasedAllResources.getObjectResources().forEach(
             x -> {
-                x.minResourcePercent = 
availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
                 if (requestedResources != null) {
                     // negate unrequested resources
                     
x.availableResources.updateForRareResourceAffinity(requestedResources);
                 }
+                x.minResourcePercent = 
availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
                 x.avgResourcePercent = 
availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
 
                 LOG.trace("for {}: minResourcePercent={}, 
avgResourcePercent={}, numExistingSchedule={}",
@@ -202,34 +217,30 @@ public class NodeSorter implements INodeSorter {
             }
         );
 
-        // Use the following comparator to return a sorted set
-        TreeSet<ObjectResourcesItem> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
-                        return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
-                        return 1;
-                    } else {
-                        double o1Avg = o1.avgResourcePercent;
-                        double o2Avg = o2.avgResourcePercent;
-
-                        if (o1Avg > o2Avg) {
-                            return -1;
-                        } else if (o1Avg < o2Avg) {
-                            return 1;
-                        } else {
-                            if (o1.minResourcePercent > o2.minResourcePercent) 
{
-                                return -1;
-                            } else if (o1.minResourcePercent < 
o2.minResourcePercent) {
-                                return 1;
-                            } else {
-                                return o1.id.compareTo(o2.id);
-                            }
-                        }
-                    }
-                });
+        // Use the following comparator to sort
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            double o1Avg = o1.avgResourcePercent;
+            double o2Avg = o2.avgResourcePercent;
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new 
TreeSet(comparator);
         
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
         LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
         return sortedObjectResources;
@@ -255,40 +266,48 @@ public class NodeSorter implements INodeSorter {
      * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
      * @param exec                 executor for which the sorting is done
      * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
-     * @return a sorted list of ObjectResources
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
      */
     @Deprecated
-    private TreeSet<ObjectResourcesItem> sortObjectResourcesGeneric(
+    private Iterable<ObjectResourcesItem> sortObjectResourcesGeneric(
             final ObjectResourcesSummary allResources, ExecutorDetails exec,
             final ExistingScheduleFunc existingScheduleFunc) {
         ObjectResourcesSummary affinityBasedAllResources = new 
ObjectResourcesSummary(allResources);
-        NormalizedResourceRequest requestedResources = 
topologyDetails.getTotalResources(exec);
-        for (ObjectResourcesItem objectResources : 
affinityBasedAllResources.getObjectResources()) {
-            
objectResources.availableResources.updateForRareResourceAffinity(requestedResources);
-        }
         final NormalizedResourceOffer availableResourcesOverall = 
allResources.getAvailableResourcesOverall();
+        final NormalizedResourceRequest requestedResources = (exec != null) ? 
topologyDetails.getTotalResources(exec) : null;
+        affinityBasedAllResources.getObjectResources().forEach(
+            x -> {
+                if (requestedResources != null) {
+                    // negate unrequested resources
+                    
x.availableResources.updateForRareResourceAffinity(requestedResources);
+                }
+                x.minResourcePercent = 
availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
+                x.avgResourcePercent = 
availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
 
-        TreeSet<ObjectResourcesItem> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
-                        return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
-                        return 1;
-                    } else {
-                        double o1Avg = 
availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
-                        double o2Avg = 
availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
-
-                        if (o1Avg > o2Avg) {
-                            return -1;
-                        } else if (o1Avg < o2Avg) {
-                            return 1;
-                        } else {
-                            return o1.id.compareTo(o2.id);
-                        }
-                    }
-                });
+                LOG.trace("for {}: minResourcePercent={}, 
avgResourcePercent={}, numExistingSchedule={}",
+                    x.id, x.minResourcePercent, x.avgResourcePercent,
+                    existingScheduleFunc.getNumExistingSchedule(x.id));
+            }
+        );
+
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            double o1Avg = o1.avgResourcePercent;
+            double o2Avg = o2.avgResourcePercent;
+            if (o1Avg > o2Avg) {
+                return -1;
+            } else if (o1Avg < o2Avg) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new 
TreeSet<>(comparator);
         
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
         LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
         return sortedObjectResources;
@@ -310,10 +329,10 @@ public class NodeSorter implements INodeSorter {
      *
      * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
      * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
-     * @return a sorted list of ObjectResources
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
      */
     @Deprecated
-    private TreeSet<ObjectResourcesItem> sortObjectResourcesDefault(
+    private Iterable<ObjectResourcesItem> sortObjectResourcesDefault(
             final ObjectResourcesSummary allResources,
             final ExistingScheduleFunc existingScheduleFunc) {
 
@@ -328,31 +347,28 @@ public class NodeSorter implements INodeSorter {
                     
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
         }
 
-        TreeSet<ObjectResourcesItem> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
-                        return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
-                        return 1;
-                    } else {
-                        if (o1.minResourcePercent > o2.minResourcePercent) {
-                            return -1;
-                        } else if (o1.minResourcePercent < 
o2.minResourcePercent) {
-                            return 1;
-                        } else {
-                            double diff = o1.avgResourcePercent - 
o2.avgResourcePercent;
-                            if (diff > 0.0) {
-                                return -1;
-                            } else if (diff < 0.0) {
-                                return 1;
-                            } else {
-                                return o1.id.compareTo(o2.id);
-                            }
-                        }
-                    }
-                });
+        Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+            int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+            int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+            if (execsScheduled1 > execsScheduled2) {
+                return -1;
+            } else if (execsScheduled1 < execsScheduled2) {
+                return 1;
+            }
+            if (o1.minResourcePercent > o2.minResourcePercent) {
+                return -1;
+            } else if (o1.minResourcePercent < o2.minResourcePercent) {
+                return 1;
+            }
+            double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+            if (diff > 0.0) {
+                return -1;
+            } else if (diff < 0.0) {
+                return 1;
+            }
+            return o1.id.compareTo(o2.id);
+        };
+        TreeSet<ObjectResourcesItem> sortedObjectResources = new 
TreeSet<>(comparator);
         sortedObjectResources.addAll(allResources.getObjectResources());
         LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
         return sortedObjectResources;
@@ -371,28 +387,75 @@ public class NodeSorter implements INodeSorter {
      * calculation, nodes nodes that have more balanced resource availability. 
So we will be less likely to pick a node that have a lot of
      * one resource but a low amount of another.
      *
-     * @param availRasNodes a list of all the nodes we want to sort
+     * @param availHosts a collection of all the hosts we want to sort
      * @param rackId     the rack id availNodes are a part of
-     * @return a sorted list of nodes.
+     * @return an iterable of sorted hosts.
      */
-    private TreeSet<ObjectResourcesItem> sortNodes(
-            List<RasNode> availRasNodes, ExecutorDetails exec, String rackId,
+    private Iterable<ObjectResourcesItem> sortHosts(
+            Collection<String> availHosts, ExecutorDetails exec, String rackId,
             Map<String, AtomicInteger> scheduledCount) {
         ObjectResourcesSummary rackResourcesSummary = new 
ObjectResourcesSummary("RACK");
+        availHosts.forEach(h -> {
+            ObjectResourcesItem hostItem = new ObjectResourcesItem(h);
+            for (RasNode x : hostnameToNodes.get(h)) {
+                hostItem.add(new ObjectResourcesItem(x.getId(), 
x.getTotalAvailableResources(), x.getTotalResources(), 0, 0));
+            }
+            rackResourcesSummary.addObjectResourcesItem(hostItem);
+        });
+
+        LOG.debug(
+                "Rack {}: Overall Avail [ {} ] Total [ {} ]",
+                rackId,
+                rackResourcesSummary.getAvailableResourcesOverall(),
+                rackResourcesSummary.getTotalResourcesOverall());
+
+        return sortObjectResources(
+            rackResourcesSummary,
+            exec,
+            (hostId) -> {
+                AtomicInteger count = scheduledCount.get(hostId);
+                if (count == null) {
+                    return 0;
+                }
+                return count.get();
+            });
+    }
+
+    /**
+     * Nodes are sorted by two criteria.
+     *
+     * <p>1) the number executors of the topology that needs to be scheduled 
is already on the node in
+     * descending order. The reasoning to sort based on criterion 1 is so we 
schedule the rest of a topology on the same node as the
+     * existing executors of the topology.
+     *
+     * <p>2) the subordinate/subservient resource availability percentage of a 
node in descending
+     * order We calculate the resource availability percentage by dividing the 
resource availability that have exhausted or little of one of
+     * the resources mentioned above will be ranked after on the node by the 
resource availability of the entire rack By doing this
+     * calculation, nodes nodes that have more balanced resource availability. 
So we will be less likely to pick a node that have a lot of
+     * one resource but a low amount of another.
+     *
+     * @param availRasNodes a list of all the nodes we want to sort
+     * @param hostId     the host-id that availNodes are a part of
+     * @return an {@link Iterable} of sorted {@link ObjectResourcesItem} for 
nodes.
+     */
+    private Iterable<ObjectResourcesItem> sortNodes(
+            List<RasNode> availRasNodes, ExecutorDetails exec, String hostId,
+            Map<String, AtomicInteger> scheduledCount) {
+        ObjectResourcesSummary hostResourcesSummary = new 
ObjectResourcesSummary("HOST");
         availRasNodes.forEach(x ->
-                rackResourcesSummary.addObjectResourcesItem(
+                hostResourcesSummary.addObjectResourcesItem(
                         new ObjectResourcesItem(x.getId(), 
x.getTotalAvailableResources(), x.getTotalResources(), 0, 0)
                 )
         );
 
         LOG.debug(
-            "Rack {}: Overall Avail [ {} ] Total [ {} ]",
-            rackId,
-            rackResourcesSummary.getAvailableResourcesOverall(),
-            rackResourcesSummary.getTotalResourcesOverall());
+            "Host {}: Overall Avail [ {} ] Total [ {} ]",
+            hostId,
+            hostResourcesSummary.getAvailableResourcesOverall(),
+            hostResourcesSummary.getTotalResourcesOverall());
 
         return sortObjectResources(
-            rackResourcesSummary,
+            hostResourcesSummary,
             exec,
             (superId) -> {
                 AtomicInteger count = scheduledCount.get(superId);
@@ -422,13 +485,14 @@ public class NodeSorter implements INodeSorter {
     private class LazyNodeSortingIterator implements Iterator<String> {
         private final LazyNodeSorting parent;
         private final Iterator<ObjectResourcesItem> rackIterator;
+        private Iterator<ObjectResourcesItem> hostIterator;
         private Iterator<ObjectResourcesItem> nodeIterator;
         private String nextValueFromNode = null;
         private final Iterator<String> pre;
         private final Iterator<String> post;
         private final Set<String> skip;
 
-        LazyNodeSortingIterator(LazyNodeSorting parent, 
TreeSet<ObjectResourcesItem> sortedRacks) {
+        LazyNodeSortingIterator(LazyNodeSorting parent, 
Iterable<ObjectResourcesItem> sortedRacks) {
             this.parent = parent;
             rackIterator = sortedRacks.iterator();
             pre = favoredNodeIds.iterator();
@@ -442,11 +506,20 @@ public class NodeSorter implements INodeSorter {
             if (nodeIterator != null && nodeIterator.hasNext()) {
                 return nodeIterator;
             }
-            //need to get the next node iterator
+            //need to get the next host/node iterator
+            if (hostIterator != null && hostIterator.hasNext()) {
+                ObjectResourcesItem host = hostIterator.next();
+                final String hostId = host.id;
+                nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
+                return nodeIterator;
+            }
             if (rackIterator.hasNext()) {
                 ObjectResourcesItem rack = rackIterator.next();
                 final String rackId = rack.id;
-                nodeIterator = parent.getSortedNodesFor(rackId).iterator();
+                hostIterator = parent.getSortedHostsForRack(rackId).iterator();
+                ObjectResourcesItem host = hostIterator.next();
+                final String hostId = host.id;
+                nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
                 return nodeIterator;
             }
 
@@ -494,9 +567,11 @@ public class NodeSorter implements INodeSorter {
     }
 
     private class LazyNodeSorting implements Iterable<String> {
+        private final Map<String, AtomicInteger> perHostScheduledCount = new 
HashMap<>();
         private final Map<String, AtomicInteger> perNodeScheduledCount = new 
HashMap<>();
-        private final TreeSet<ObjectResourcesItem> sortedRacks;
-        private final Map<String, TreeSet<ObjectResourcesItem>> cachedNodes = 
new HashMap<>();
+        private final Iterable<ObjectResourcesItem> sortedRacks;
+        private final Map<String, Iterable<ObjectResourcesItem>> cachedHosts = 
new HashMap<>();
+        private final Map<String, Iterable<ObjectResourcesItem>> 
cachedNodesByHost = new HashMap<>();
         private final ExecutorDetails exec;
         private final Set<String> skippedNodeIds = new HashSet<>();
 
@@ -512,16 +587,24 @@ public class NodeSorter implements INodeSorter {
                 for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
                     assignment.getSlotToExecutors().entrySet()) {
                     String superId = entry.getKey().getNodeId();
-                    perNodeScheduledCount.computeIfAbsent(superId, (sid) -> 
new AtomicInteger(0))
+                    String hostId = nodeIdToHostname.get(superId);
+                    perHostScheduledCount.computeIfAbsent(hostId, id -> new 
AtomicInteger(0))
+                        .getAndAdd(entry.getValue().size());
+                    perNodeScheduledCount.computeIfAbsent(superId, id -> new 
AtomicInteger(0))
                         .getAndAdd(entry.getValue().size());
                 }
             }
-            sortedRacks = sortRacks(exec);
+            sortedRacks = getSortedRacks();
+        }
+
+        private Iterable<ObjectResourcesItem> getSortedHostsForRack(String 
rackId) {
+            return cachedHosts.computeIfAbsent(rackId,
+                id -> sortHosts(rackIdToHosts.getOrDefault(id, 
Collections.emptySet()), exec, id, perHostScheduledCount));
         }
 
-        private TreeSet<ObjectResourcesItem> getSortedNodesFor(String rackId) {
-            return cachedNodes.computeIfAbsent(rackId,
-                (rid) -> sortNodes(rackIdToNodes.getOrDefault(rid, 
Collections.emptyList()), exec, rid, perNodeScheduledCount));
+        private Iterable<ObjectResourcesItem> getSortedNodesForHost(String 
hostId) {
+            return cachedNodesByHost.computeIfAbsent(hostId,
+                id -> sortNodes(hostnameToNodes.getOrDefault(id, 
Collections.emptyList()), exec, id, perNodeScheduledCount));
         }
 
         @Override
@@ -531,37 +614,38 @@ public class NodeSorter implements INodeSorter {
     }
 
     @Override
-    public Iterable<String> sortAllNodes(ExecutorDetails exec) {
+    public Iterable<String> sortAllNodes() {
         return new LazyNodeSorting(exec);
     }
 
     private ObjectResourcesSummary createClusterSummarizedResources() {
         ObjectResourcesSummary clusterResourcesSummary = new 
ObjectResourcesSummary("Cluster");
-
-        //This is the first time so initialize the resources.
-        for (Map.Entry<String, List<String>> entry : 
networkTopography.entrySet()) {
-            String rackId = entry.getKey();
-            List<String> nodeHosts = entry.getValue();
-            ObjectResourcesItem rack = new ObjectResourcesItem(rackId);
-            for (String nodeHost : nodeHosts) {
-                for (RasNode node : hostnameToNodes(nodeHost)) {
-                    
rack.availableResources.add(node.getTotalAvailableResources());
-                    rack.totalResources.add(node.getTotalAvailableResources());
+        rackIdToHosts.forEach((rackId, hostIds) -> {
+            if (hostIds == null || hostIds.isEmpty()) {
+                LOG.info("Ignoring Rack {} since it has no hosts", rackId);
+            } else {
+                ObjectResourcesItem rack = new ObjectResourcesItem(rackId);
+                for (String hostId : hostIds) {
+                    for (RasNode node : hostnameToNodes(hostId)) {
+                        
rack.availableResources.add(node.getTotalAvailableResources());
+                        rack.totalResources.add(node.getTotalResources());
+                    }
                 }
+                clusterResourcesSummary.addObjectResourcesItem(rack);
             }
-            clusterResourcesSummary.addObjectResourcesItem(rack);
-        }
+        });
 
         LOG.debug(
-            "Cluster Overall Avail [ {} ] Total [ {} ]",
+            "Cluster Overall Avail [ {} ] Total [ {} ], rackCnt={}, 
hostCnt={}",
             clusterResourcesSummary.getAvailableResourcesOverall(),
-            clusterResourcesSummary.getTotalResourcesOverall());
+            clusterResourcesSummary.getTotalResourcesOverall(),
+            clusterResourcesSummary.getObjectResources().size(),
+            rackIdToHosts.values().stream().mapToInt(x -> x.size()).sum());
         return clusterResourcesSummary;
     }
 
-    private Map<String, AtomicInteger> getScheduledExecCntByRackId() {
-        String topoId = topologyDetails.getId();
-        SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
+    public Map<String, AtomicInteger> getScheduledExecCntByRackId() {
+        SchedulerAssignment assignment = 
cluster.getAssignmentById(topologyDetails.getId());
         Map<String, AtomicInteger> scheduledCount = new HashMap<>();
         if (assignment != null) {
             for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
@@ -588,10 +672,9 @@ public class NodeSorter implements INodeSorter {
      * racks that have more balanced resource availability. So we will be less 
likely to pick a rack that have a lot of one resource but a
      * low amount of another.
      *
-     * @return a sorted list of racks
+     * @return an iterable of sorted racks
      */
-    @Override
-    public TreeSet<ObjectResourcesItem> sortRacks(ExecutorDetails exec) {
+    public Iterable<ObjectResourcesItem> getSortedRacks() {
 
         final ObjectResourcesSummary clusterResourcesSummary = 
createClusterSummarizedResources();
         final Map<String, AtomicInteger> scheduledCount = 
getScheduledExecCntByRackId();
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 11de468..dcffa99 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -189,13 +189,34 @@ public class TestUtilsForResourceAwareScheduler {
         }
     }
 
-    public static Map<String, SupervisorDetails> genSupervisorsWithRacks(int 
numRacks, int numSupersPerRack, int numPorts, int rackStart,
-                                                                         int 
superInRackStart, double cpu, double mem,
-                                                                         
Map<String, Double> miscResources) {
-        Map<String, Double> resourceMap = new HashMap<>();
-        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
-        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
-        resourceMap.putAll(miscResources);
+    public static Map<String, SupervisorDetails> genSupervisorsWithRacks(
+            int numRacks, int numSupersPerRack, int numPorts, int rackStart, 
int superInRackStart,
+            double cpu, double mem, Map<String, Double> miscResources) {
+
+        return genSupervisorsWithRacksAndNuma(numRacks, numSupersPerRack, 1, 
numPorts, rackStart,
+                superInRackStart, cpu, mem, miscResources, 1.0);
+    }
+
+    /**
+     * Takes one additional parameter numaZonesPerHost. This parameter 
determines how many supervisors
+     * will be created on the same host. If numaResourceMultiplier is set to a 
factor below 1.0, then
+     * each subsequent numa zone will have corresponding lower cpu/mem than 
previous numa zone.
+     *
+     * @param numRacks
+     * @param numSupersPerRack
+     * @param numaZonesPerHost
+     * @param numPorts
+     * @param rackStart
+     * @param superInRackStart
+     * @param cpu
+     * @param mem
+     * @param miscResources
+     * @param numaResourceMultiplier - cpu/mem resource for each numaZone is 
multiplied by this factor to obtain uneven resources
+     * @return
+     */
+    public static Map<String, SupervisorDetails> 
genSupervisorsWithRacksAndNuma(
+            int numRacks, int numSupersPerRack, int numaZonesPerHost, int 
numPorts, int rackStart, int superInRackStart,
+            double cpu, double mem, Map<String, Double> miscResources, double 
numaResourceMultiplier) {
         Map<String, SupervisorDetails> retList = new HashMap<>();
         for (int rack = rackStart; rack < numRacks + rackStart; rack++) {
             for (int superInRack = superInRackStart; superInRack < 
(numSupersPerRack + superInRackStart); superInRack++) {
@@ -203,8 +224,23 @@ public class TestUtilsForResourceAwareScheduler {
                 for (int p = 0; p < numPorts; p++) {
                     ports.add(p);
                 }
-                SupervisorDetails sup = new 
SupervisorDetails(String.format("r%03ds%03d", rack, superInRack),
-                    String.format("host-%03d-rack-%03d", superInRack, rack), 
null, ports,
+                String superId;
+                String host;
+                int numaZone = superInRack % numaZonesPerHost;
+                if (numaZonesPerHost > 1) {
+                    // multiple supervisors per host
+                    int hostInRack = superInRack / numaZonesPerHost;
+                    superId = String.format("r%03ds%03dn%d", rack, 
superInRack, numaZone);
+                    host = String.format("host-%03d-rack-%03d", hostInRack, 
rack);
+                } else {
+                    superId = String.format("r%03ds%03d", rack, superInRack);
+                    host = String.format("host-%03d-rack-%03d", superInRack, 
rack);
+                }
+                Map<String, Double> resourceMap = new HashMap<>();
+                resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu * 
Math.pow(numaResourceMultiplier, numaZone));
+                resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem * 
Math.pow(numaResourceMultiplier, numaZone));
+                resourceMap.putAll(miscResources);
+                SupervisorDetails sup = new SupervisorDetails(superId, host, 
null, ports,
                     
NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
                 retList.put(sup.getId(), sup);
 
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index 96a0f9a..fce3065 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -22,6 +22,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
@@ -36,6 +39,7 @@ import 
org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSorterByConstraintSeverity;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.json.simple.JSONValue;
@@ -77,6 +81,12 @@ public class TestConstraintSolverStrategy {
 
     public TestConstraintSolverStrategy(boolean consolidatedConfigFlag) {
         this.consolidatedConfigFlag = consolidatedConfigFlag;
+        List<Class> classesToDebug = 
Arrays.asList(TestConstraintSolverStrategy.class,
+                BaseResourceAwareStrategy.class, ResourceAwareScheduler.class,
+                NodeSorterHostProximity.class, Cluster.class
+        );
+        Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose 
otherwise Level.INFO
+        classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), 
logLevel));
         LOG.info("Running tests with consolidatedConfigFlag={}", 
consolidatedConfigFlag);
     }
 
@@ -180,15 +190,15 @@ public class TestConstraintSolverStrategy {
         return makeTestTopoConf(1);
     }
 
-    public TopologyDetails makeTopology(Map<String, Object> config, int 
boltParallel) {
+    public static TopologyDetails makeTopology(Map<String, Object> config, int 
boltParallel) {
         return genTopology("testTopo", config, 1, 4, 4, boltParallel, 0, 0, 
"user");
     }
 
-    public Cluster makeCluster(Topologies topologies) {
+    public static Cluster makeCluster(Topologies topologies) {
         return makeCluster(topologies, null);
     }
 
-    public Cluster makeCluster(Topologies topologies, Map<String, 
SupervisorDetails> supMap) {
+    public static Cluster makeCluster(Topologies topologies, Map<String, 
SupervisorDetails> supMap) {
         if (supMap == null) {
             supMap = genSupervisors(4, 2, 120, 1200);
         }
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 1b1c035..b6f2a20 100644
--- 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -44,6 +44,7 @@ import 
org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorter;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
 import org.apache.storm.topology.SharedOffHeapWithinNode;
 import org.apache.storm.topology.SharedOffHeapWithinWorker;
 import org.apache.storm.topology.SharedOnHeap;
@@ -74,7 +75,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
+
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
 
@@ -373,6 +375,15 @@ public class TestDefaultResourceAwareStrategy {
         SchedulerAssignment assignment = 
cluster.getAssignmentById(topo.getId());
         TopologyResources topologyResources = 
cluster.getTopologyResourcesMap().get(topo.getId());
         long numNodes = 
assignment.getSlotToExecutors().keySet().stream().map(WorkerSlot::getNodeId).distinct().count();
+        String assignmentString = "Assignments:\n\t" + 
assignment.getSlotToExecutors().entrySet().stream()
+                .map(x -> String.format("Node=%s, components=%s",
+                        x.getKey().getNodeId(),
+                        x.getValue().stream()
+                                .map(y -> topo.getComponentFromExecutor(y))
+                                .collect(Collectors.joining(","))
+                        )
+                )
+                .collect(Collectors.joining("\n\t"));
 
         if (schedulingLimitation == 
WorkerRestrictionType.WORKER_RESTRICTION_NONE) {
             // Everything should fit in a single slot
@@ -411,7 +422,7 @@ public class TestDefaultResourceAwareStrategy {
             int numAssignedWorkers = cluster.getAssignedNumWorkers(topo);
             assertThat(numAssignedWorkers, is(8));
             assertThat(assignment.getSlots().size(), is(8));
-            assertThat(numNodes, is(2L));
+            assertThat(assignmentString, numNodes, is(2L));
         } else if (schedulingLimitation == 
WorkerRestrictionType.WORKER_RESTRICTION_ONE_COMPONENT) {
             double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + 
sharedOnHeapWithinWorker;
             double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + 
sharedOffHeapWithinWorker + sharedOffHeapWithinNode;
@@ -766,22 +777,17 @@ public class TestDefaultResourceAwareStrategy {
         for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) 
{
             String hostName = entry.getKey();
             String rack = entry.getValue();
-            List<String> nodesForRack = rackToNodes.get(rack);
-            if (nodesForRack == null) {
-                nodesForRack = new ArrayList<>();
-                rackToNodes.put(rack, nodesForRack);
-            }
-            nodesForRack.add(hostName);
+            rackToNodes.computeIfAbsent(rack, rid -> new 
ArrayList<>()).add(hostName);
         }
         cluster.setNetworkTopography(rackToNodes);
 
         DefaultResourceAwareStrategyOld rs = new 
DefaultResourceAwareStrategyOld();
         
         rs.prepareForScheduling(cluster, topo1);
-        INodeSorter nodeSorter = new NodeSorter(cluster, topo1, 
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
-        TreeSet<ObjectResourcesItem> sortedRacks = nodeSorter.sortRacks(null);
+        INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1, 
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+        nodeSorter.prepare(null);
+        Iterable<ObjectResourcesItem> sortedRacks = 
nodeSorter.getSortedRacks();
 
-        Assert.assertEquals("# of racks sorted", 6, sortedRacks.size());
         Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
         // Ranked first since rack-0 has the most balanced set of resources
         Assert.assertEquals("rack-0 should be ordered first", "rack-0", 
it.next().id);
@@ -904,10 +910,10 @@ public class TestDefaultResourceAwareStrategy {
         DefaultResourceAwareStrategyOld rs = new 
DefaultResourceAwareStrategyOld();
 
         rs.prepareForScheduling(cluster, topo1);
-        INodeSorter nodeSorter = new NodeSorter(cluster, topo1, 
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
-        TreeSet<ObjectResourcesItem> sortedRacks= nodeSorter.sortRacks(null);
+        INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1, 
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+        nodeSorter.prepare(null);
+        Iterable<ObjectResourcesItem> sortedRacks= nodeSorter.getSortedRacks();
 
-        Assert.assertEquals("# of racks sorted", 5, sortedRacks.size());
         Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
         // Ranked first since rack-0 has the most balanced set of resources
         Assert.assertEquals("rack-0 should be ordered first", "rack-0", 
it.next().id);
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java
new file mode 100644
index 0000000..84b839f
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java
@@ -0,0 +1,1036 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNodes;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourcesExtension;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@ExtendWith({NormalizedResourcesExtension.class})
+public class TestNodeSorterHostProximity {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestNodeSorterHostProximity.class);
+    private static final int CURRENT_TIME = 1450418597;
+
+    protected Class getDefaultResourceAwareStrategyClass() {
+        return DefaultResourceAwareStrategy.class;
+    }
+
+    private Config createClusterConfig(double compPcore, double compOnHeap, 
double compOffHeap,
+                                       Map<String, Map<String, Number>> pools) 
{
+        Config config = 
TestUtilsForResourceAwareScheduler.createClusterConfig(compPcore, compOnHeap, 
compOffHeap, pools);
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
getDefaultResourceAwareStrategyClass().getName());
+        return config;
+    }
+
+    private static class TestDNSToSwitchMapping implements DNSToSwitchMapping {
+        private final Map<String, String> hostToRackMap;
+        private final Map<String, List<String>> rackToHosts;
+
+        @SafeVarargs
+        public TestDNSToSwitchMapping(Map<String, SupervisorDetails>... racks) 
{
+            Set<String> seenHosts = new HashSet<>();
+            Map<String, String> hostToRackMap = new HashMap<>();
+            Map<String, List<String>> rackToHosts = new HashMap<>();
+            for (int rackNum = 0; rackNum < racks.length; rackNum++) {
+                String rack = String.format("rack-%03d", rackNum);
+                for (SupervisorDetails sup : racks[rackNum].values()) {
+                    hostToRackMap.put(sup.getHost(), rack);
+                    String host = sup.getHost();
+                    if (!seenHosts.contains(host)) {
+                        rackToHosts.computeIfAbsent(rack, rid -> new 
ArrayList<>()).add(host);
+                        seenHosts.add(host);
+                    }
+                }
+            }
+            this.hostToRackMap = Collections.unmodifiableMap(hostToRackMap);
+            this.rackToHosts = Collections.unmodifiableMap(rackToHosts);
+        }
+
+        /**
+         * Use the "rack-%03d" embedded in the name of the supervisor to 
determine the rack number.
+         *
+         * @param supervisorDetailsCollection
+         */
+        public TestDNSToSwitchMapping(Collection<SupervisorDetails> 
supervisorDetailsCollection) {
+            Set<String> seenHosts = new HashSet<>();
+            Map<String, String> hostToRackMap = new HashMap<>();
+            Map<String, List<String>> rackToHosts = new HashMap<>();
+
+            for (SupervisorDetails supervisorDetails: 
supervisorDetailsCollection) {
+                String rackId = 
supervisorIdToRackName(supervisorDetails.getId());
+                hostToRackMap.put(supervisorDetails.getHost(), rackId);
+                String host = supervisorDetails.getHost();
+                if (!seenHosts.contains(host)) {
+                    rackToHosts.computeIfAbsent(rackId, rid -> new 
ArrayList<>()).add(host);
+                    seenHosts.add(host);
+                }
+            }
+            this.hostToRackMap = Collections.unmodifiableMap(hostToRackMap);
+            this.rackToHosts = Collections.unmodifiableMap(rackToHosts);
+        }
+
+        @Override
+        public Map<String, String> resolve(List<String> names) {
+            return hostToRackMap;
+        }
+
+        public Map<String, List<String>> getRackToHosts() {
+            return rackToHosts;
+        }
+    }
+
+    /**
+     * Test whether strategy will choose correct rack.
+     */
+    @Test
+    public void testMultipleRacks() {
+        final Map<String, SupervisorDetails> supMap = new HashMap<>();
+        final int numRacks = 1;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 4;
+        final int numZonesPerHost = 1;
+        final double numaResourceMultiplier = 1.0;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+
+        final Map<String, SupervisorDetails> supMapRack0 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                400, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate another rack of supervisors with less resources
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack1 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                200, 4000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate some supervisors that are depleted of one resource
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack2 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                0, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate some that has a lot of memory but little of cpu
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack3 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                10, 8000 * 2 + 4000, 
Collections.emptyMap(),numaResourceMultiplier);
+
+        //generate some that has a lot of cpu but little of memory
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack4 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                400 + 200 + 10, 1000, Collections.emptyMap(), 
numaResourceMultiplier);
+
+        //Generate some that have neither resource, to verify that the 
strategy will prioritize this last
+        //Also put a generic resource with 0 value in the resources list, to 
verify that it doesn't affect the sorting
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack5 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                0.0, 0.0, Collections.singletonMap("gpu.count", 0.0), 
numaResourceMultiplier);
+
+        supMap.putAll(supMapRack0);
+        supMap.putAll(supMapRack1);
+        supMap.putAll(supMapRack2);
+        supMap.putAll(supMapRack3);
+        supMap.putAll(supMapRack4);
+        supMap.putAll(supMapRack5);
+
+        Config config = createClusterConfig(100, 500, 500, null);
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        INimbus iNimbus = new INimbusTest();
+
+        //create test DNSToSwitchMapping plugin
+        TestDNSToSwitchMapping testDNSToSwitchMapping =
+                new TestDNSToSwitchMapping(supMapRack0, supMapRack1, 
supMapRack2, supMapRack3, supMapRack4, supMapRack5);
+
+        //generate topologies
+        TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0, 
CURRENT_TIME - 2, 10, "user");
+        TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0, 
CURRENT_TIME - 2, 10, "user");
+
+        Topologies topologies = new Topologies(topo1, topo2);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+        List<String> supHostnames = new LinkedList<>();
+        for (SupervisorDetails sup : supMap.values()) {
+            supHostnames.add(sup.getHost());
+        }
+        Map<String, List<String>> rackToHosts = 
testDNSToSwitchMapping.getRackToHosts();
+        cluster.setNetworkTopography(rackToHosts);
+
+        NodeSorterHostProximity nodeSorter = new 
NodeSorterHostProximity(cluster, topo1, 
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+        nodeSorter.prepare(null);
+        List<ObjectResourcesItem> sortedRacks = 
StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+                .collect(Collectors.toList());
+        String rackSummaries = sortedRacks.stream()
+                .map(x -> String.format("Rack %s -> scheduled-cnt %d, 
min-avail %f, avg-avail %f, cpu %f, mem %f",
+                        x.id, 
nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new 
AtomicInteger(-1)).get(),
+                        x.minResourcePercent, x.avgResourcePercent,
+                        x.availableResources.getTotalCpu(),
+                        x.availableResources.getTotalMemoryMb()))
+                .collect(Collectors.joining("\n\t"));
+        Assert.assertEquals(rackSummaries + "\n# of racks sorted", 6, 
sortedRacks.size());
+        Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+        Assert.assertEquals(rackSummaries + "\nrack-000 should be ordered 
first since it has the most balanced set of resources", "rack-000", 
it.next().id);
+        Assert.assertEquals(rackSummaries + "\nrack-001 should be ordered 
second since it has a balanced set of resources but less than rack-000", 
"rack-001", it.next().id);
+        Assert.assertEquals(rackSummaries + "\nrack-004 should be ordered 
third since it has a lot of cpu but not a lot of memory", "rack-004", 
it.next().id);
+        Assert.assertEquals(rackSummaries + "\nrack-003 should be ordered 
fourth since it has a lot of memory but not cpu", "rack-003", it.next().id);
+        Assert.assertEquals(rackSummaries + "\nrack-002 should be ordered 
fifth since it has not cpu resources", "rack-002", it.next().id);
+        Assert.assertEquals(rackSummaries + "\nRack-005 should be ordered 
sixth since it has neither CPU nor memory available", "rack-005", it.next().id);
+    }
+
+    /**
+     * Test whether strategy will choose correct rack.
+     */
+    @Test
+    public void testMultipleRacksWithFavoritism() {
+        final Map<String, SupervisorDetails> supMap = new HashMap<>();
+        final int numRacks = 1;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 4;
+        final int numZonesPerHost = 2;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+        final Map<String, SupervisorDetails> supMapRack0 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                400, 8000, Collections.emptyMap(), 1.0);
+
+        //generate another rack of supervisors with less resources
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack1 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                200, 4000, Collections.emptyMap(), 1.0);
+
+        //generate some supervisors that are depleted of one resource
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack2 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                0, 8000, Collections.emptyMap(), 1.0);
+
+        //generate some that has a lot of memory but little of cpu
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack3 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                10, 8000 * 2 + 4000, Collections.emptyMap(), 1.0);
+
+        //generate some that has a lot of cpu but little of memory
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack4 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                400 + 200 + 10, 1000, Collections.emptyMap(), 1.0);
+
+        supMap.putAll(supMapRack0);
+        supMap.putAll(supMapRack1);
+        supMap.putAll(supMapRack2);
+        supMap.putAll(supMapRack3);
+        supMap.putAll(supMapRack4);
+
+        Config config = createClusterConfig(100, 500, 500, null);
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        INimbus iNimbus = new INimbusTest();
+
+        //create test DNSToSwitchMapping plugin
+        TestDNSToSwitchMapping testDNSToSwitchMapping =
+                new TestDNSToSwitchMapping(supMapRack0, supMapRack1, 
supMapRack2, supMapRack3, supMapRack4);
+
+        Config t1Conf = new Config();
+        t1Conf.putAll(config);
+        final List<String> t1FavoredHostNames = Arrays.asList("host-41", 
"host-42", "host-43");
+        t1Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, 
t1FavoredHostNames);
+        final List<String> t1UnfavoredHostIds = Arrays.asList("host-1", 
"host-2", "host-3");
+        t1Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, 
t1UnfavoredHostIds);
+        //generate topologies
+        TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0, 
CURRENT_TIME - 2, 10, "user");
+
+
+        Config t2Conf = new Config();
+        t2Conf.putAll(config);
+        t2Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, 
Arrays.asList("host-31", "host-32", "host-33"));
+        t2Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, 
Arrays.asList("host-11", "host-12", "host-13"));
+        TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0, 
CURRENT_TIME - 2, 10, "user");
+
+        Topologies topologies = new Topologies(topo1, topo2);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+        List<String> supHostnames = new LinkedList<>();
+        for (SupervisorDetails sup : supMap.values()) {
+            supHostnames.add(sup.getHost());
+        }
+        Map<String, List<String>> rackToHosts = 
testDNSToSwitchMapping.getRackToHosts();
+        cluster.setNetworkTopography(rackToHosts);
+
+        NodeSorterHostProximity nodeSorter = new 
NodeSorterHostProximity(cluster, topo1, 
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+        nodeSorter.prepare(null);
+        List<ObjectResourcesItem> sortedRacks = 
StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+                .collect(Collectors.toList());
+        String rackSummaries = sortedRacks.stream()
+                .map(x -> String.format("Rack %s -> scheduled-cnt %d, 
min-avail %f, avg-avail %f, cpu %f, mem %f",
+                        x.id, 
nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new 
AtomicInteger(-1)).get(),
+                        x.minResourcePercent, x.avgResourcePercent,
+                        x.availableResources.getTotalCpu(),
+                        x.availableResources.getTotalMemoryMb()))
+                .collect(Collectors.joining("\n\t"));
+
+        Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+        // Ranked first since rack-000 has the most balanced set of resources
+        Assert.assertEquals("rack-000 should be ordered first", "rack-000", 
it.next().id);
+        // Ranked second since rack-1 has a balanced set of resources but less 
than rack-0
+        Assert.assertEquals("rack-001 should be ordered second", "rack-001", 
it.next().id);
+        // Ranked third since rack-4 has a lot of cpu but not a lot of memory
+        Assert.assertEquals("rack-004 should be ordered third", "rack-004", 
it.next().id);
+        // Ranked fourth since rack-3 has alot of memory but not cpu
+        Assert.assertEquals("rack-003 should be ordered fourth", "rack-003", 
it.next().id);
+        //Ranked last since rack-2 has not cpu resources
+        Assert.assertEquals("rack-00s2 should be ordered fifth", "rack-002", 
it.next().id);
+    }
+
+    /**
+     * Test if hosts are presented together regardless of resource 
availability.
+     * Supervisors are created with multiple Numa zones in such a manner that 
resources on two numa zones on the same host
+     * differ widely in resource availability.
+     */
+    @Test
+    public void testMultipleRacksWithHostProximity() {
+        final Map<String, SupervisorDetails> supMap = new HashMap<>();
+        final int numRacks = 1;
+        final int numSupersPerRack = 12;
+        final int numPortsPerSuper = 4;
+        final int numZonesPerHost = 3;
+        final double numaResourceMultiplier = 0.4;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+
+        final Map<String, SupervisorDetails> supMapRack0 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                400, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate another rack of supervisors with less resources
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack1 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                200, 4000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate some supervisors that are depleted of one resource
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack2 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                0, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+        //generate some that has a lot of memory but little of cpu
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack3 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                10, 8000 * 2 + 4000, 
Collections.emptyMap(),numaResourceMultiplier);
+
+        //generate some that has a lot of cpu but little of memory
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack4 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                400 + 200 + 10, 1000, Collections.emptyMap(), 
numaResourceMultiplier);
+
+        supMap.putAll(supMapRack0);
+        supMap.putAll(supMapRack1);
+        supMap.putAll(supMapRack2);
+        supMap.putAll(supMapRack3);
+        supMap.putAll(supMapRack4);
+
+        Config config = createClusterConfig(100, 500, 500, null);
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        INimbus iNimbus = new INimbusTest();
+
+        //create test DNSToSwitchMapping plugin
+        TestDNSToSwitchMapping testDNSToSwitchMapping =
+                new TestDNSToSwitchMapping(supMapRack0, supMapRack1, 
supMapRack2, supMapRack3, supMapRack4);
+
+        Config t1Conf = new Config();
+        t1Conf.putAll(config);
+        final List<String> t1FavoredHostNames = Arrays.asList("host-41", 
"host-42", "host-43");
+        t1Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, 
t1FavoredHostNames);
+        final List<String> t1UnfavoredHostIds = Arrays.asList("host-1", 
"host-2", "host-3");
+        t1Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, 
t1UnfavoredHostIds);
+        //generate topologies
+        TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0, 
CURRENT_TIME - 2, 10, "user");
+
+
+        Config t2Conf = new Config();
+        t2Conf.putAll(config);
+        t2Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, 
Arrays.asList("host-31", "host-32", "host-33"));
+        t2Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, 
Arrays.asList("host-11", "host-12", "host-13"));
+        TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0, 
CURRENT_TIME - 2, 10, "user");
+
+        Topologies topologies = new Topologies(topo1, topo2);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1);
+        nodeSorter.prepare(null);
+
+        Set<String> seenHosts = new HashSet<>();
+        String prevHost = null;
+        List<String> errLines = new ArrayList();
+        Map<String, String> nodeToHost = new 
RasNodes(cluster).getNodeIdToHostname();
+        for (String nodeId: nodeSorter.sortAllNodes()) {
+            String host = nodeToHost.getOrDefault(nodeId, "no-host-for-node-" 
+ nodeId);
+            errLines.add(String.format("\tnodeId:%s, host:%s", nodeId, host));
+            if (!host.equals(prevHost) && seenHosts.contains(host)) {
+                String err = String.format("Host %s for node %s is out of 
order:\n\t%s", host, nodeId, String.join("\n\t", errLines));
+                Assert.fail(err);
+            }
+            seenHosts.add(host);
+            prevHost = host;
+        }
+    }
+
+    /**
+     * Racks should be returned in order of decreasing capacity.
+     */
+    @Test
+    public void testMultipleRacksOrderedByCapacity() {
+        final Map<String, SupervisorDetails> supMap = new HashMap<>();
+        final int numRacks = 1;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 4;
+        final int numZonesPerHost = 1;
+        final double numaResourceMultiplier = 1.0;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+
+        final Map<String, SupervisorDetails> supMapRack0 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                600, 8000 - rackStartNum, Collections.emptyMap(), 
numaResourceMultiplier);
+
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack1 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                500, 8000 - rackStartNum, Collections.emptyMap(), 
numaResourceMultiplier);
+
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack2 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                400, 8000 - rackStartNum, Collections.emptyMap(), 
numaResourceMultiplier);
+
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack3 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                300, 8000 - rackStartNum, 
Collections.emptyMap(),numaResourceMultiplier);
+
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack4 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                200, 8000 - rackStartNum, Collections.emptyMap(), 
numaResourceMultiplier);
+
+        // too small to hold topology
+        supStartNum += numSupersPerRack;
+        final Map<String, SupervisorDetails> supMapRack5 = 
genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum++, supStartNum,
+                100, 8000 - rackStartNum, 
Collections.singletonMap("gpu.count", 0.0), numaResourceMultiplier);
+
+        supMap.putAll(supMapRack0);
+        supMap.putAll(supMapRack1);
+        supMap.putAll(supMapRack2);
+        supMap.putAll(supMapRack3);
+        supMap.putAll(supMapRack4);
+        supMap.putAll(supMapRack5);
+
+        Config config = createClusterConfig(100, 500, 500, null);
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        INimbus iNimbus = new INimbusTest();
+
+        //create test DNSToSwitchMapping plugin
+        TestDNSToSwitchMapping testDNSToSwitchMapping =
+                new TestDNSToSwitchMapping(supMapRack0, supMapRack1, 
supMapRack2, supMapRack3, supMapRack4, supMapRack5);
+
+        //generate topologies
+        TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0, 
CURRENT_TIME - 2, 10, "user");
+        TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0, 
CURRENT_TIME - 2, 10, "user");
+
+        Topologies topologies = new Topologies(topo1, topo2);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        NodeSorterHostProximity nodeSorter = new 
NodeSorterHostProximity(cluster, topo1);
+        nodeSorter.prepare(null);
+        List<ObjectResourcesItem> sortedRacks = 
StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+                .collect(Collectors.toList());
+        String rackSummaries = sortedRacks
+                .stream()
+                .map(x -> String.format("Rack %s -> scheduled-cnt %d, 
min-avail %f, avg-avail %f, cpu %f, mem %f",
+                        x.id, 
nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new 
AtomicInteger(-1)).get(),
+                        x.minResourcePercent, x.avgResourcePercent,
+                        x.availableResources.getTotalCpu(),
+                        x.availableResources.getTotalMemoryMb()))
+                .collect(Collectors.joining("\n\t"));
+        NormalizedResourceRequest topoResourceRequest = 
topo1.getApproximateTotalResources();
+        String topoRequest = String.format("Topo %s, 
approx-requested-resources %s", topo1.getId(), topoResourceRequest.toString());
+        Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nRack-000 
should be ordered first since it has the largest capacity", "rack-000", 
it.next().id);
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-001 
should be ordered second since it smaller than rack-000", "rack-001", 
it.next().id);
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-002 
should be ordered third since it is smaller than rack-001", "rack-002", 
it.next().id);
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-003 
should be ordered fourth since it since it is smaller than rack-002", 
"rack-003", it.next().id);
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-004 
should be ordered fifth since it since it is smaller than rack-003", 
"rack-004", it.next().id);
+        Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-005 
should be ordered last since it since it is has smallest capacity", "rack-005", 
it.next().id);
+    }
+
+    /**
+     * Schedule two topologies, once with special resources and another 
without.
+     * There are enough special resources to hold one topology with special 
resource ("my.gpu").
+     * If the sort order is incorrect, scheduling will not succeed.
+     */
+    @Test
+    public void testAntiAffinityWithMultipleTopologies() {
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(1, 40, 
66, 0, 0, 4700, 226200, new HashMap<>());
+        HashMap<String, Double> extraResources = new HashMap<>();
+        extraResources.put("my.gpu", 1.0);
+        supMap.putAll(genSupervisorsWithRacks(1, 40, 66, 1, 0, 4700, 226200, 
extraResources));
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(88, 775, 25, null, null));
+
+        IScheduler scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        TopologyDetails tdSimple = genTopology("topology-simple", config, 1,
+                5, 100, 300, 0, 0, "user", 8192);
+
+        //Schedule the simple topology first
+        Topologies topologies = new Topologies(tdSimple);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+        {
+            NodeSorterHostProximity nodeSorter = new 
NodeSorterHostProximity(cluster, tdSimple);
+            for (ExecutorDetails exec : tdSimple.getExecutors()) {
+                nodeSorter.prepare(exec);
+                List<ObjectResourcesItem> sortedRacks = StreamSupport
+                        .stream(nodeSorter.getSortedRacks().spliterator(), 
false)
+                        .collect(Collectors.toList());
+                String rackSummaries = StreamSupport
+                        .stream(sortedRacks.spliterator(), false)
+                        .map(x -> String.format("Rack %s -> scheduled-cnt %d, 
min-avail %f, avg-avail %f, cpu %f, mem %f",
+                                x.id, 
nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new 
AtomicInteger(-1)).get(),
+                                x.minResourcePercent, x.avgResourcePercent,
+                                x.availableResources.getTotalCpu(),
+                                x.availableResources.getTotalMemoryMb()))
+                        .collect(Collectors.joining("\n\t"));
+                NormalizedResourceRequest topoResourceRequest = 
tdSimple.getApproximateTotalResources();
+                String topoRequest = String.format("Topo %s, 
approx-requested-resources %s", tdSimple.getId(), 
topoResourceRequest.toString());
+                Assert.assertEquals(rackSummaries + "\n# of racks sorted", 2, 
sortedRacks.size());
+                Assert.assertEquals(rackSummaries + "\nFirst rack sorted", 
"rack-000", sortedRacks.get(0).id);
+                Assert.assertEquals(rackSummaries + "\nSecond rack sorted", 
"rack-001", sortedRacks.get(1).id);
+            }
+        }
+
+        scheduler.schedule(topologies, cluster);
+
+        TopologyBuilder builder = topologyBuilder(1, 5, 100, 300);
+        builder.setBolt("gpu-bolt", new TestBolt(), 40)
+                .addResource("my.gpu", 1.0)
+                .shuffleGrouping("spout-0");
+        TopologyDetails tdGpu = topoToTopologyDetails("topology-gpu", config, 
builder.createTopology(), 0, 0,"user", 8192);
+
+        //Now schedule GPU but with the simple topology in place.
+        topologies = new Topologies(tdSimple, tdGpu);
+        cluster = new Cluster(cluster, topologies);
+        {
+            NodeSorterHostProximity nodeSorter = new 
NodeSorterHostProximity(cluster, tdGpu);
+            for (ExecutorDetails exec : tdGpu.getExecutors()) {
+                String comp = tdGpu.getComponentFromExecutor(exec);
+                nodeSorter.prepare(exec);
+                List<ObjectResourcesItem> sortedRacks = StreamSupport
+                        .stream(nodeSorter.getSortedRacks().spliterator(), 
false).collect(Collectors.toList());
+                String rackSummaries = sortedRacks.stream()
+                        .map(x -> String.format("Rack %s -> scheduled-cnt %d, 
min-avail %f, avg-avail %f, cpu %f, mem %f",
+                                x.id, 
nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new 
AtomicInteger(-1)).get(),
+                                x.minResourcePercent, x.avgResourcePercent,
+                                x.availableResources.getTotalCpu(),
+                                x.availableResources.getTotalMemoryMb()))
+                        .collect(Collectors.joining("\n\t"));
+                NormalizedResourceRequest topoResourceRequest = 
tdSimple.getApproximateTotalResources();
+                String topoRequest = String.format("Topo %s, 
approx-requested-resources %s", tdSimple.getId(), 
topoResourceRequest.toString());
+                Assert.assertEquals(rackSummaries + "\n# of racks sorted", 2, 
sortedRacks.size());
+                if (comp.equals("gpu-bolt")) {
+                    Assert.assertEquals(rackSummaries + "\nFirst rack sorted 
for " + comp, "rack-001", sortedRacks.get(0).id);
+                    Assert.assertEquals(rackSummaries + "\nSecond rack sorted 
for " + comp, "rack-000", sortedRacks.get(1).id);
+                } else {
+                    Assert.assertEquals(rackSummaries + "\nFirst rack sorted 
for " + comp, "rack-000", sortedRacks.get(0).id);
+                    Assert.assertEquals(rackSummaries + "\nSecond rack sorted 
for " + comp, "rack-001", sortedRacks.get(1).id);
+                }
+            }
+        }
+
+        scheduler.schedule(topologies, cluster);
+
+        Map<String, SchedulerAssignment> assignments = new 
TreeMap<>(cluster.getAssignments());
+        assertEquals(2, assignments.size());
+
+        Map<String, Map<String, AtomicLong>> topoPerRackCount = new 
HashMap<>();
+        for (Map.Entry<String, SchedulerAssignment> entry: 
assignments.entrySet()) {
+            SchedulerAssignment sa = entry.getValue();
+            Map<String, AtomicLong> slotsPerRack = new TreeMap<>();
+            for (WorkerSlot slot : sa.getSlots()) {
+                String nodeId = slot.getNodeId();
+                String rack = supervisorIdToRackName(nodeId);
+                slotsPerRack.computeIfAbsent(rack, (r) -> new 
AtomicLong(0)).incrementAndGet();
+            }
+            LOG.info("{} => {}", entry.getKey(), slotsPerRack);
+            topoPerRackCount.put(entry.getKey(), slotsPerRack);
+        }
+
+        Map<String, AtomicLong> simpleCount = 
topoPerRackCount.get("topology-simple-0");
+        assertNotNull(simpleCount);
+        //Because the simple topology was scheduled first we want to be sure 
that it didn't put anything on
+        // the GPU nodes.
+        assertEquals(1, simpleCount.size()); //Only 1 rack is in use
+        assertFalse(simpleCount.containsKey("r001")); //r001 is the second 
rack with GPUs
+        assertTrue(simpleCount.containsKey("r000")); //r000 is the first rack 
with no GPUs
+
+        //We don't really care too much about the scheduling of 
topology-gpu-0, because it was scheduled.
+    }
+
+    /**
+     * Free one-fifth of WorkerSlots.
+     */
+    private void freeSomeWorkerSlots(Cluster cluster) {
+        Map<String, SchedulerAssignment> assignmentMap = 
cluster.getAssignments();
+        for (SchedulerAssignment schedulerAssignment: assignmentMap.values()) {
+            int i = 0;
+            List<WorkerSlot> slotsToKill = new ArrayList<>();
+            for (WorkerSlot workerSlot: schedulerAssignment.getSlots()) {
+                i++;
+                if (i % 5 == 0) {
+                    slotsToKill.add(workerSlot);
+                }
+            }
+            cluster.freeSlots(slotsToKill);
+        }
+    }
+
+    /**
+     * If the topology is too large for one rack, it should be partially 
scheduled onto the next rack (and next rack only).
+     */
+    @Test
+    public void testFillUpRackAndSpilloverToNextRack() {
+        INimbus iNimbus = new INimbusTest();
+        double compPcore = 100;
+        double compOnHeap = 775;
+        double compOffHeap = 25;
+        int topo1NumSpouts = 1;
+        int topo1NumBolts = 5;
+        int topo1SpoutParallelism = 100;
+        int topo1BoltParallelism = 200;
+        final int numRacks = 3;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 6;
+        final int numZonesPerHost = 1;
+        final double numaResourceMultiplier = 1.0;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+        long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + 
topo1NumBolts * topo1BoltParallelism) * 4/5; // not enough for topo1
+        long compPerSuper =  compPerRack / numSupersPerRack;
+        double cpuPerSuper = compPcore * compPerSuper;
+        double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+        double topo1MaxHeapSize = memPerSuper;
+        final String topoName1 = "topology1";
+
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum, supStartNum,
+                cpuPerSuper, memPerSuper, Collections.emptyMap(), 
numaResourceMultiplier);
+        TestDNSToSwitchMapping testDNSToSwitchMapping = new 
TestDNSToSwitchMapping(supMap.values());
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(compPcore, compOnHeap, 
compOffHeap, null, null));
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
GenericResourceAwareStrategy.class.getName());
+
+        IScheduler scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+                topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 
0, "user", topo1MaxHeapSize);
+
+        //Schedule the topo1 topology and ensure it fits on 2 racks
+        Topologies topologies = new Topologies(td1);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        scheduler.schedule(topologies, cluster);
+        Set<String> assignedRacks = cluster.getAssignedRacks(td1.getId());
+        assertEquals("Racks for topology=" + td1.getId() + " is " + 
assignedRacks, 2, assignedRacks.size());
+    }
+
+    /**
+     * Rack with low resources should be used to schedule an executor if it 
has other executors for the same topology.
+     * <li>Schedule topo1 on one rack</li>
+     * <li>unassign some executors</li>
+     * <li>schedule another topology to partially fill up rack1</li>
+     * <li>Add another rack and schedule topology 1 remaining executors 
again</li>
+     * <li>scheduling should utilize all resources on rack1 before before 
trying next rack</li>
+     */
+    @Test
+    public void testPreferRackWithTopoExecutors() {
+        INimbus iNimbus = new INimbusTest();
+        double compPcore = 100;
+        double compOnHeap = 775;
+        double compOffHeap = 25;
+        int topo1NumSpouts = 1;
+        int topo1NumBolts = 5;
+        int topo1SpoutParallelism = 100;
+        int topo1BoltParallelism = 200;
+        int topo2NumSpouts = 1;
+        int topo2NumBolts = 5;
+        int topo2SpoutParallelism = 10;
+        int topo2BoltParallelism = 20;
+        final int numRacks = 3;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 6;
+        final int numZonesPerHost = 1;
+        final double numaResourceMultiplier = 1.0;
+        int rackStartNum = 0;
+        int supStartNum = 0;
+        long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + 
topo1NumBolts * topo1BoltParallelism
+                + topo2NumSpouts * topo2SpoutParallelism); // enough for topo1 
but not topo1+topo2
+        long compPerSuper =  compPerRack / numSupersPerRack;
+        double cpuPerSuper = compPcore * compPerSuper;
+        double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+        double topo1MaxHeapSize = memPerSuper;
+        double topo2MaxHeapSize = memPerSuper;
+        final String topoName1 = "topology1";
+        final String topoName2 = "topology2";
+
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithRacksAndNuma(
+                numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper, 
rackStartNum, supStartNum,
+                cpuPerSuper, memPerSuper, Collections.emptyMap(), 
numaResourceMultiplier);
+        TestDNSToSwitchMapping testDNSToSwitchMapping = new 
TestDNSToSwitchMapping(supMap.values());
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(compPcore, compOnHeap, 
compOffHeap, null, null));
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
GenericResourceAwareStrategy.class.getName());
+
+        IScheduler scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+                topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 
0, "user", topo1MaxHeapSize);
+
+        //Schedule the topo1 topology and ensure it fits on 1 rack
+        Topologies topologies = new Topologies(td1);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        scheduler.schedule(topologies, cluster);
+        Set<String> assignedRacks = cluster.getAssignedRacks(td1.getId());
+        assertEquals("Racks for topology=" + td1.getId() + " is " + 
assignedRacks, 1, assignedRacks.size());
+
+        TopologyBuilder builder = topologyBuilder(topo2NumSpouts, 
topo2NumBolts, topo2SpoutParallelism, topo2BoltParallelism);
+        TopologyDetails td2 = topoToTopologyDetails(topoName2, config, 
builder.createTopology(), 0, 0,"user", topo2MaxHeapSize);
+
+        //Now schedule GPU but with the simple topology in place.
+        topologies = new Topologies(td1, td2);
+        cluster = new Cluster(cluster, topologies);
+        scheduler.schedule(topologies, cluster);
+
+        assignedRacks = cluster.getAssignedRacks(td1.getId(), td2.getId());
+        assertEquals("Racks for topologies=" + td1.getId() + "/" + td2.getId() 
+ " is " + assignedRacks, 2, assignedRacks.size());
+
+        // topo2 gets scheduled on its own rack because it is empty and 
available
+        assignedRacks = cluster.getAssignedRacks(td2.getId());
+        assertEquals("Racks for topologies=" + td2.getId() + " is " + 
assignedRacks, 1, assignedRacks.size());
+
+        // now unassign topo2, expect only one rack to be in use; free some 
slots and reschedule topo1 some topo1 executors
+        cluster.unassign(td2.getId());
+        assignedRacks = cluster.getAssignedRacks(td2.getId());
+        assertEquals("After unassigning topology " + td2.getId() + ", racks 
for topology=" + td2.getId() + " is " + assignedRacks,
+                0, assignedRacks.size());
+        assignedRacks = cluster.getAssignedRacks(td1.getId());
+        assertEquals("After unassigning topology " + td2.getId() + ", racks 
for topology=" + td1.getId() + " is " + assignedRacks,
+                1, assignedRacks.size());
+        assertFalse("Topology " + td1.getId() + " should be fully assigned 
before freeing slots", cluster.needsSchedulingRas(td1));
+        freeSomeWorkerSlots(cluster);
+        assertTrue("Topology " + td1.getId() + " should need scheduling after 
freeing slots", cluster.needsSchedulingRas(td1));
+
+        // then reschedule executors
+        scheduler.schedule(topologies, cluster);
+
+        // only one rack should be in use by topology1
+        assignedRacks = cluster.getAssignedRacks(td1.getId());
+        assertEquals("After reassigning topology " + td2.getId() + ", racks 
for topology=" + td1.getId() + " is " + assignedRacks,
+                1, assignedRacks.size());
+    }
+
+    /**
+     * Assign and then clear out a rack to host list mapping in 
cluster.networkTopography.
+     * Expected behavior is that:
+     *  <li>the rack without hosts does not show up in {@link 
NodeSorterHostProximity#getSortedRacks()}</li>
+     *  <li>all the supervisor nodes still get returned in {@link 
NodeSorterHostProximity#sortAllNodes()} ()}</li>
+     *  <li>supervisors on cleared rack show up under {@link 
DNSToSwitchMapping#DEFAULT_RACK}</li>
+     *
+     *  <p>
+     *      Force an usual condition, where one of the racks is still passed 
to LazyNodeSortingIterator with
+     *      an empty list and then ensure that code is resilient.
+     *  </p>
+     */
+    @Test
+    void testWithImpairedClusterNetworkTopography() {
+        INimbus iNimbus = new INimbusTest();
+        double compPcore = 100;
+        double compOnHeap = 775;
+        double compOffHeap = 25;
+        int topo1NumSpouts = 1;
+        int topo1NumBolts = 5;
+        int topo1SpoutParallelism = 100;
+        int topo1BoltParallelism = 200;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 66;
+        long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + 
topo1NumBolts * topo1BoltParallelism + 10);
+        long compPerSuper =  compPerRack / numSupersPerRack;
+        double cpuPerSuper = compPcore * compPerSuper;
+        double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+        double topo1MaxHeapSize = memPerSuper;
+        final String topoName1 = "topology1";
+        int numRacks = 3;
+
+        Map<String, SupervisorDetails> supMap = 
genSupervisorsWithRacks(numRacks, numSupersPerRack,  numPortsPerSuper,
+            0, 0, cpuPerSuper, memPerSuper, new HashMap<>());
+        TestDNSToSwitchMapping testDNSToSwitchMapping = new 
TestDNSToSwitchMapping(supMap.values());
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(compPcore, compOnHeap, 
compOffHeap, null, null));
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
GenericResourceAwareStrategy.class.getName());
+
+        IScheduler scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+            topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0, 
"user", topo1MaxHeapSize);
+
+        Topologies topologies = new Topologies(td1);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        Map<String, List<String>> networkTopography = 
cluster.getNetworkTopography();
+        assertEquals("Expecting " + numRacks + " racks found " + 
networkTopography.size(), numRacks, networkTopography.size());
+        assertTrue("Expecting racks count to be >= 3, found " + 
networkTopography.size(), networkTopography.size() >= 3);
+
+        // Impair cluster.networkTopography and set one rack to have zero 
hosts, getSortedRacks should exclude this rack.
+        // Keep, the supervisorDetails unchanged - confirm that these nodes 
are not lost even with incomplete networkTopography
+        String rackIdToZero = 
networkTopography.keySet().stream().findFirst().get();
+        impairClusterRack(cluster, rackIdToZero, true, false);
+
+        NodeSorterHostProximity nodeSorterHostProximity = new 
NodeSorterHostProximity(cluster, td1);
+        nodeSorterHostProximity.getSortedRacks().forEach(x -> 
assertNotEquals(x.id, rackIdToZero));
+
+        // confirm that the above action has not lost the hosts and that they 
appear under the DEFAULT rack
+        {
+            Set<String> seenRacks = new HashSet<>();
+            nodeSorterHostProximity.getSortedRacks().forEach(x -> 
seenRacks.add(x.id));
+            assertEquals("Expecting rack cnt to be still " + numRacks, 
numRacks, seenRacks.size());
+            assertTrue("Expecting to see default-rack=" + 
DNSToSwitchMapping.DEFAULT_RACK + " in sortedRacks",
+                seenRacks.contains(DNSToSwitchMapping.DEFAULT_RACK));
+        }
+
+        // now check if node/supervisor is missing when sorting all nodes
+        Set<String> expectedNodes = supMap.keySet();
+        Set<String> seenNodes = new HashSet<>();
+        nodeSorterHostProximity.prepare(null);
+        nodeSorterHostProximity.sortAllNodes().forEach( n -> seenNodes.add(n));
+        assertEquals("Expecting see all supervisors ", expectedNodes, 
seenNodes);
+
+        // Now fully impair the cluster - confirm no default rack
+        {
+            cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+            cluster.setNetworkTopography(new 
TestDNSToSwitchMapping(supMap.values()).getRackToHosts());
+            impairClusterRack(cluster, rackIdToZero, true, true);
+            Set<String> seenRacks = new HashSet<>();
+            NodeSorterHostProximity nodeSorterHostProximity2 = new 
NodeSorterHostProximity(cluster, td1);
+            nodeSorterHostProximity2.getSortedRacks().forEach(x -> 
seenRacks.add(x.id));
+            Map<String, Set<String>> rackIdToHosts = 
nodeSorterHostProximity2.getRackIdToHosts();
+            String dumpOfRacks = rackIdToHosts.entrySet().stream()
+                .map(x -> String.format("rack %s -> hosts [%s]", x.getKey(), 
String.join(",", x.getValue())))
+                .collect(Collectors.joining("\n\t"));
+            assertEquals("Expecting rack cnt to be " + (numRacks - 1) + " but 
found " + seenRacks.size() + "\n\t" + dumpOfRacks,
+                numRacks - 1, seenRacks.size());
+            assertFalse("Found default-rack=" + 
DNSToSwitchMapping.DEFAULT_RACK + " in \n\t" + dumpOfRacks,
+                seenRacks.contains(DNSToSwitchMapping.DEFAULT_RACK));
+        }
+    }
+
+    /**
+     * Black list all nodes for a rack before sorting nodes.
+     * Confirm that {@link NodeSorterHostProximity#sortAllNodes()} still works.
+     *
+     */
+    @Test
+    void testWithBlackListedHosts() {
+        INimbus iNimbus = new INimbusTest();
+        double compPcore = 100;
+        double compOnHeap = 775;
+        double compOffHeap = 25;
+        int topo1NumSpouts = 1;
+        int topo1NumBolts = 5;
+        int topo1SpoutParallelism = 100;
+        int topo1BoltParallelism = 200;
+        final int numSupersPerRack = 10;
+        final int numPortsPerSuper = 66;
+        long compPerRack = (topo1NumSpouts * topo1SpoutParallelism + 
topo1NumBolts * topo1BoltParallelism + 10);
+        long compPerSuper =  compPerRack / numSupersPerRack;
+        double cpuPerSuper = compPcore * compPerSuper;
+        double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+        double topo1MaxHeapSize = memPerSuper;
+        final String topoName1 = "topology1";
+        int numRacks = 3;
+
+        Map<String, SupervisorDetails> supMap = 
genSupervisorsWithRacks(numRacks, numSupersPerRack,  numPortsPerSuper,
+            0, 0, cpuPerSuper, memPerSuper, new HashMap<>());
+        TestDNSToSwitchMapping testDNSToSwitchMapping = new 
TestDNSToSwitchMapping(supMap.values());
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(compPcore, compOnHeap, 
compOffHeap, null, null));
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, 
GenericResourceAwareStrategy.class.getName());
+
+        IScheduler scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config, new StormMetricsRegistry());
+
+        TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+            topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0, 
"user", topo1MaxHeapSize);
+
+        Topologies topologies = new Topologies(td1);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new 
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+        cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+        Map<String, List<String>> networkTopography = 
cluster.getNetworkTopography();
+        assertEquals("Expecting " + numRacks + " racks found " + 
networkTopography.size(), numRacks, networkTopography.size());
+        assertTrue("Expecting racks count to be >= 3, found " + 
networkTopography.size(), networkTopography.size() >= 3);
+
+        Set<String> blackListedHosts = new HashSet<>();
+        List<SupervisorDetails> supArray = new ArrayList<>(supMap.values());
+        for (int i = 0 ; i < numSupersPerRack ; i++) {
+            blackListedHosts.add(supArray.get(i).getHost());
+        }
+        blacklistHostsAndSortNodes(blackListedHosts, supMap.values(), cluster, 
td1);
+
+        String rackToClear = 
cluster.getNetworkTopography().keySet().stream().findFirst().get();
+        blackListedHosts = new 
HashSet<>(cluster.getNetworkTopography().get(rackToClear));
+        blacklistHostsAndSortNodes(blackListedHosts, supMap.values(), cluster, 
td1);
+    }
+
+    // Impair cluster by blacklisting some hosts
+    private void blacklistHostsAndSortNodes(
+        Set<String> blackListedHosts, Collection<SupervisorDetails> sups, 
Cluster cluster, TopologyDetails td1) {
+        LOG.info("blackListedHosts={}", blackListedHosts);
+        cluster.setBlacklistedHosts(blackListedHosts);
+
+        NodeSorterHostProximity nodeSorterHostProximity = new 
NodeSorterHostProximity(cluster, td1);
+        // confirm that the above action loses hosts
+        {
+            Set<String> allHosts = sups.stream().map(x -> 
x.getHost()).collect(Collectors.toSet());
+            Set<String> seenRacks = new HashSet<>();
+            nodeSorterHostProximity.getSortedRacks().forEach(x -> 
seenRacks.add(x.id));
+            Set<String> seenHosts = new HashSet<>();
+            nodeSorterHostProximity.getRackIdToHosts().forEach((k,v) -> 
seenHosts.addAll(v));
+            allHosts.removeAll(seenHosts);
+            assertEquals("Expecting only blacklisted hosts removed", allHosts, 
blackListedHosts);
+        }
+
+        // now check if sortAllNodes still works
+        Set<String> expectedNodes = sups.stream()
+            .filter(x -> !blackListedHosts.contains(x.getHost()))
+            .map(x ->x.getId())
+            .collect(Collectors.toSet());
+        Set<String> seenNodes = new HashSet<>();
+            nodeSorterHostProximity.prepare(null);
+            nodeSorterHostProximity.sortAllNodes().forEach( n -> 
seenNodes.add(n));
+        assertEquals("Expecting see all supervisors ", expectedNodes, 
seenNodes);
+    }
+
+    /**
+     * Impair the cluster for a specified rackId.
+     *  <li>making the host list a zero length</li>
+     *  <li>removing supervisors for the hosts on the rack</li>
+     *
+     * @param cluster cluster to impair
+     * @param rackId rackId to clear
+     * @param clearNetworkTopography if true, then clear (but not remove) the 
hosts in list for the rack.
+     * @param clearSupervisorMap if true, then remove supervisors for the rack.
+     */
+    private void impairClusterRack(Cluster cluster, String rackId, boolean 
clearNetworkTopography, boolean clearSupervisorMap) {
+        Set<String> hostIds = new 
HashSet<>(cluster.getNetworkTopography().computeIfAbsent(rackId, k -> new 
ArrayList<>()));
+        if (clearNetworkTopography) {
+            cluster.getNetworkTopography().computeIfAbsent(rackId, k -> new 
ArrayList<>()).clear();
+        }
+        if (clearSupervisorMap) {
+            Set<String> supToRemove = new HashSet<>();
+            for (String hostId: hostIds) {
+                cluster.getSupervisorsByHost(hostId).forEach(s -> 
supToRemove.add(s.getId()));
+            }
+            Map<String, SupervisorDetails> supervisorDetailsMap = 
cluster.getSupervisors();
+            for (String supId: supToRemove) {
+                supervisorDetailsMap.remove(supId);
+            }
+        }
+    }
+}
\ No newline at end of file

Reply via email to