This is an automated email from the ASF dual-hosted git repository.
bipinprasad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new f1716be [STORM-3739] Scheduling should sort numa zones by host groups
(#3379)
f1716be is described below
commit f1716be3d630c44c93787500af9cee427652f548
Author: Bipin Prasad <[email protected]>
AuthorDate: Mon Mar 15 12:18:35 2021 -0500
[STORM-3739] Scheduling should sort numa zones by host groups (#3379)
* [STORM-3739] Scheduling should sort numa zones by host groups
* [YSTORM-3739] Ignore blacklisted hosts and corresponding
nodes/supervisors.
* [STORM-3739] Javadoc fix.
* [STORM-3739] Add check for null hostname, remove unused code, set
totalResources.
* [STORM-3739] Remove unused methods.
* [STORM-3739] Sort hosts comparing average resources before comparing min
resources.
* [STORM-3739] Add dead node check.
* [YSTORM-3739] NodeSortType changes and revert.
* [STORM-3739] Removed unused/commented code.
Co-authored-by: Bipin Prasad <[email protected]>
---
.../apache/storm/scheduler/ISchedulingState.java | 42 +
.../apache/storm/scheduler/resource/RasNodes.java | 12 +
.../normalization/NormalizedResources.java | 21 +-
.../scheduling/BaseResourceAwareStrategy.java | 52 +-
.../strategies/scheduling/ObjectResourcesItem.java | 9 +-
.../strategies/scheduling/sorter/INodeSorter.java | 12 +-
.../strategies/scheduling/sorter/NodeSorter.java | 196 ++--
...odeSorter.java => NodeSorterHostProximity.java} | 381 ++++---
.../TestUtilsForResourceAwareScheduler.java | 54 +-
.../scheduling/TestConstraintSolverStrategy.java | 16 +-
.../TestDefaultResourceAwareStrategy.java | 36 +-
.../sorter/TestNodeSorterHostProximity.java | 1036 ++++++++++++++++++++
12 files changed, 1567 insertions(+), 300 deletions(-)
diff --git
a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index c584351..cc3561e 100644
---
a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++
b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -19,12 +19,15 @@
package org.apache.storm.scheduler;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
@@ -283,6 +286,18 @@ public interface ISchedulingState {
Map<String, List<String>> getNetworkTopography();
/**
+ * Get host -> rack map - the inverse of networkTopography.
+ */
+ default Map<String, String> getHostToRack() {
+ Map<String, String> ret = new HashMap<>();
+ Map<String, List<String>> networkTopography = getNetworkTopography();
+ if (networkTopography != null) {
+ networkTopography.forEach((rack, hosts) -> hosts.forEach(host ->
ret.put(host, rack)));
+ }
+ return ret;
+ }
+
+ /**
* Get all topology scheduler statuses.
*/
Map<String, String> getStatusMap();
@@ -339,4 +354,31 @@ public interface ISchedulingState {
* Get the nimbus configuration.
*/
Map<String, Object> getConf();
+
+ /**
+ * Determine the list of racks on which topologyIds have been assigned.
Note that the returned set
+ * may contain {@link DNSToSwitchMapping#DEFAULT_RACK} if {@link
#getHostToRack()} is null or
+ * does not contain the assigned host.
+ *
+ * @param topologyIds for which assignments are examined.
+ * @return set of racks on which assignments have been made.
+ */
+ default Set<String> getAssignedRacks(String... topologyIds) {
+ Set<String> ret = new HashSet<>();
+ Map<String, String> networkTopographyInverted = getHostToRack();
+ for (String topologyId: topologyIds) {
+ SchedulerAssignment assignment = getAssignmentById(topologyId);
+ if (assignment == null) {
+ continue;
+ }
+ for (WorkerSlot slot : assignment.getSlots()) {
+ String nodeId = slot.getNodeId();
+ SupervisorDetails supervisorDetails =
getSupervisorById(nodeId);
+ String hostId = supervisorDetails.getHost();
+ String rackId = networkTopographyInverted.getOrDefault(hostId,
DNSToSwitchMapping.DEFAULT_RACK);
+ ret.add(rackId);
+ }
+ }
+ return ret;
+ }
}
diff --git
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
index 427fb56..645c4e0 100644
---
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
+++
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNodes.java
@@ -156,6 +156,18 @@ public class RasNodes {
return hostnameToNodes;
}
+ /**
+ * Get a map from RasNodeId to HostName.
+ *
+ * @return map of nodeId to hostname
+ */
+ public Map<String, String> getNodeIdToHostname() {
+ Map<String, String> nodeIdToHostname = new HashMap<>();
+ nodeMap.values()
+ .forEach(node -> nodeIdToHostname.put(node.getId(),
node.getHostname()));
+ return nodeIdToHostname;
+ }
+
@Override
public String toString() {
StringBuilder ret = new StringBuilder();
diff --git
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index ea77b1a..de097a6 100644
---
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -247,10 +247,16 @@ public class NormalizedResources {
return null;
}
+ private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used,
double totalMemoryMb, double usedMemoryMb, String info) {
+ throw new IllegalArgumentException(String.format("The used resources
must be a subset of the total resources."
+ + " Used: '%s', Total: '%s', Used Mem: '%f', Total
Mem: '%f', additionalInfo: '%s'",
+ used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb,
totalMemoryMb, info));
+ }
+
private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used,
double totalMemoryMb, double usedMemoryMb) {
throw new IllegalArgumentException(String.format("The used resources
must be a subset of the total resources."
- + " Used: '%s',
Total: '%s', Used Mem: '%f', Total Mem: '%f'",
-
used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
+ + " Used: '%s', Total: '%s', Used Mem: '%f', Total
Mem: '%f'",
+ used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb,
totalMemoryMb));
}
/**
@@ -375,7 +381,8 @@ public class NormalizedResources {
return 0;
}
if (used.otherResources[i] > otherResources[i]) {
- throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb,
usedMemoryMb);
+ String info = String.format("%s, %f > %f",
getResourceNameForResourceIndex(i), used.otherResources[i], otherResources[i]);
+ throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb,
usedMemoryMb, info);
}
min = Math.min(min, used.otherResources[i] / otherResources[i]);
}
@@ -384,14 +391,16 @@ public class NormalizedResources {
/**
* If a node or rack has a kind of resource not in a request, make that
resource negative so when sorting that node or rack will
- * be less likely to be selected.
+ * be less likely to be selected. If the resource is in the request, make
that resource positive.
* @param request the requested resources.
*/
public void updateForRareResourceAffinity(NormalizedResources request) {
int length = Math.min(this.otherResources.length,
request.otherResources.length);
for (int i = 0; i < length; i++) {
- if (request.getResourceAt(i) == 0.0) {
- this.otherResources[i] = -1 * this.otherResources[i];
+ if (request.getResourceAt(i) == 0.0 && this.otherResources[i] >
0.0) {
+ this.otherResources[i] = -this.otherResources[i]; // make
negative
+ } else if (request.getResourceAt(i) > 0.0 &&
this.otherResources[i] < 0.0) {
+ this.otherResources[i] = -this.otherResources[i]; // make
positive
}
}
}
diff --git
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 6f49083..4bc4d0b 100644
---
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -44,6 +44,7 @@ import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSort
import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorter;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
@@ -58,9 +59,25 @@ public abstract class BaseResourceAwareStrategy implements
IStrategy {
* Refer to {@link NodeSorter#NodeSorter(Cluster, TopologyDetails,
NodeSortType)} for more details.
*/
public enum NodeSortType {
- GENERIC_RAS, // for deprecation, Used by
GenericResourceAwareStrategyOld
- DEFAULT_RAS, // for deprecation, Used by
DefaultResourceAwareStrategyOld
- COMMON // new and only node sorting type going forward
+ /**
+ * Generic Resource Aware Strategy sorting type.
+ * @deprecated used by GenericResourceAwareStrategyOld only. Use {link
#COMMON} instead.
+ */
+ @Deprecated
+ GENERIC_RAS,
+
+ /**
+ * Default Resource Aware Strategy sorting type.
+ * @deprecated used by DefaultResourceAwareStrategyOld only. Use {link
#COMMON} instead.
+ */
+ @Deprecated
+ DEFAULT_RAS,
+
+ /**
+ * New and only node sorting type going forward.
+ * {@link NodeSorterHostProximity#NodeSorterHostProximity(Cluster,
TopologyDetails)} for more details
+ */
+ COMMON,
}
// instance variables from class instantiation
@@ -149,7 +166,8 @@ public abstract class BaseResourceAwareStrategy implements
IStrategy {
List<ExecutorDetails> orderedExecutors =
execSorter.sortExecutors(unassignedExecutors);
Iterable<String> sortedNodes = null;
if (!this.sortNodesForEachExecutor) {
- sortedNodes = nodeSorter.sortAllNodes(null);
+ nodeSorter.prepare(null);
+ sortedNodes = nodeSorter.sortAllNodes();
}
return scheduleExecutorsOnNodes(orderedExecutors, sortedNodes);
}
@@ -189,7 +207,7 @@ public abstract class BaseResourceAwareStrategy implements
IStrategy {
LOG.debug("The max state search that will be used by topology {} is
{}", topologyDetails.getId(), maxStateSearch);
searcherState = createSearcherState();
- setNodeSorter(new NodeSorter(cluster, topologyDetails, nodeSortType));
+ setNodeSorter(new NodeSorterHostProximity(cluster, topologyDetails,
nodeSortType));
setExecSorter(orderExecutorsByProximity
? new ExecSorterByProximity(topologyDetails)
: new ExecSorterByConnectionCount(topologyDetails));
@@ -413,7 +431,8 @@ public abstract class BaseResourceAwareStrategy implements
IStrategy {
for (int i = 0; i < maxExecCnt ; i++) {
progressIdxForExec[i] = -1;
}
- LOG.info("scheduleExecutorsOnNodes: will assign {} executors for topo
{}", maxExecCnt, topoName);
+ LOG.debug("scheduleExecutorsOnNodes: will assign {} executors for topo
{}, sortNodesForEachExecutor={}",
+ maxExecCnt, topoName, sortNodesForEachExecutor);
OUTERMOST_LOOP:
for (int loopCnt = 0 ; true ; loopCnt++) {
@@ -450,7 +469,8 @@ public abstract class BaseResourceAwareStrategy implements
IStrategy {
String comp = execToComp.get(exec);
if (sortedNodesIter == null || (this.sortNodesForEachExecutor &&
searcherState.isExecCompDifferentFromPrior())) {
progressIdx = -1;
- sortedNodesIter = nodeSorter.sortAllNodes(exec);
+ nodeSorter.prepare(exec);
+ sortedNodesIter = nodeSorter.sortAllNodes();
}
for (String nodeId : sortedNodesIter) {
@@ -470,8 +490,8 @@ public abstract class BaseResourceAwareStrategy implements
IStrategy {
if (numBoundAckerAssigned == -1) {
// This only happens when trying to assign bound
ackers to the worker slot and failed.
// Free the entire worker slot and put those bound
ackers back to unassigned list
- LOG.debug("Failed to assign bound acker for exec: {}
of topo: {} to worker: {}. Backtracking.",
- exec, topoName, workerSlot);
+ LOG.debug("Failed to assign bound acker for exec={},
comp={}, topo: {} to worker: {}. Backtracking.",
+ exec, comp, topoName, workerSlot);
searcherState.freeWorkerSlotWithBoundAckers(node,
workerSlot);
continue;
}
@@ -481,9 +501,13 @@ public abstract class BaseResourceAwareStrategy implements
IStrategy {
// and this is not the first exec to this workerSlot.
// So just go to next workerSlot and don't free the
worker.
if (numBoundAckerAssigned > 0) {
- LOG.debug("Failed to assign exec: {} of topo: {}
with bound ackers to worker: {}. Backtracking.",
- exec, topoName, workerSlot);
+ LOG.debug("Failed to assign exec={}, comp={},
topo={} with bound ackers to worker: {}. Backtracking.",
+ exec, comp, topoName, workerSlot);
searcherState.freeWorkerSlotWithBoundAckers(node,
workerSlot);
+ } else {
+ LOG.debug("Failed to assign exec={}, comp={},
topo={} to worker={} on node=({}, availCpu={}, availMem={}).",
+ exec, comp, topoName, workerSlot,
+ node.getId(), node.getAvailableCpuResources(),
node.getAvailableMemoryResources());
}
continue;
}
@@ -506,8 +530,10 @@ public abstract class BaseResourceAwareStrategy implements
IStrategy {
searcherState = searcherState.nextExecutor();
nodeForExec[execIndex] = node;
workerSlotForExec[execIndex] = workerSlot;
- LOG.debug("scheduleExecutorsOnNodes: Assigned execId={},
comp={} to node={}, slot-ordinal={} at loopCnt={}, topo={}",
- execIndex, comp, nodeId, progressIdx, loopCnt,
topoName);
+ LOG.debug("scheduleExecutorsOnNodes: Assigned execId={},
comp={} to node={}/cpu={}/mem={}, "
+ + "slot-ordinal={} at loopCnt={}, topo={}",
+ execIndex, comp, nodeId,
node.getAvailableCpuResources(), node.getAvailableMemoryResources(),
+ progressIdx, loopCnt, topoName);
continue OUTERMOST_LOOP;
}
}
diff --git
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
index 5532d78..7b4f7d2 100644
---
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
+++
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ObjectResourcesItem.java
@@ -30,7 +30,7 @@ public class ObjectResourcesItem {
/**
* Amongst all {@link #availableResources}, this is the minimum ratio of
resource to the total available in group.
- * Note that nodes are grouped into racks. And racks are grouped under the
cluster.
+ * Note that nodes are grouped into hosts. Hosts into racks. And racks are
grouped under the cluster.
*
* <p>
* An example of this calculation is in
@@ -43,7 +43,7 @@ public class ObjectResourcesItem {
/**
* Amongst all {@link #availableResources}, this is the average ratio of
resource to the total available in group.
- * Note that nodes are grouped into racks. And racks are grouped under the
cluster.
+ * Note that nodes are grouped into hosts, hosts into racks, and racks are
grouped under the cluster.
*
* <p>
* An example of this calculation is in
@@ -73,6 +73,11 @@ public class ObjectResourcesItem {
this.avgResourcePercent = avgResourcePercent;
}
+ public void add(ObjectResourcesItem other) {
+ this.availableResources.add(other.availableResources);
+ this.totalResources.add(other.totalResources);
+ }
+
@Override
public String toString() {
return this.id;
diff --git
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
index ceda82e..f2a4e8d 100644
---
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
+++
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/INodeSorter.java
@@ -18,14 +18,20 @@
package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
-import java.util.TreeSet;
import org.apache.storm.scheduler.ExecutorDetails;
import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
public interface INodeSorter {
- TreeSet<ObjectResourcesItem> sortRacks(ExecutorDetails exec);
+ /**
+ * Prepare for node sorting. This method must be called before {@link
#getSortedRacks()} and {@link #sortAllNodes()}.
+ *
+ * @param exec optional, may be null.
+ */
+ void prepare(ExecutorDetails exec);
- Iterable<String> sortAllNodes(ExecutorDetails exec);
+ Iterable<ObjectResourcesItem> getSortedRacks();
+
+ Iterable<String> sortAllNodes();
}
diff --git
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
index 85a9015..7ff4b05 100644
---
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
+++
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
@@ -21,6 +21,7 @@ package
org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -28,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -70,6 +70,9 @@ public class NodeSorter implements INodeSorter {
protected List<String> favoredNodeIds;
protected List<String> unFavoredNodeIds;
+ // Updated in prepare method
+ ExecutorDetails exec;
+
/**
* Initialize for the default implementation node sorting.
*
@@ -93,21 +96,18 @@ public class NodeSorter implements INodeSorter {
// from Cluster
networkTopography = cluster.getNetworkTopography();
- Map<String, String> hostToRack = new HashMap<>();
- for (Map.Entry<String, List<String>> entry :
networkTopography.entrySet()) {
- String rackId = entry.getKey();
- for (String hostName: entry.getValue()) {
- hostToRack.put(hostName, rackId);
- }
- }
+ Map<String, String> hostToRack = cluster.getHostToRack();
RasNodes nodes = new RasNodes(cluster);
for (RasNode node: nodes.getNodes()) {
String superId = node.getId();
String hostName = node.getHostname();
+ if (!node.isAlive() || hostName == null) {
+ continue;
+ }
String rackId = hostToRack.getOrDefault(hostName,
DNSToSwitchMapping.DEFAULT_RACK);
superIdToRack.put(superId, rackId);
hostnameToNodes.computeIfAbsent(hostName, (hn) -> new
ArrayList<>()).add(node);
- rackIdToNodes.computeIfAbsent(rackId, (hn) -> new
ArrayList<>()).add(node);
+ rackIdToNodes.computeIfAbsent(rackId, (rid) -> new
ArrayList<>()).add(node);
}
this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
@@ -122,16 +122,21 @@ public class NodeSorter implements INodeSorter {
unFavoredNodeIds.removeAll(favoredNodeIds);
}
+ @Override
+ public void prepare(ExecutorDetails exec) {
+ this.exec = exec;
+ }
+
/**
- * Scheduling uses {@link #sortAllNodes(ExecutorDetails)} which eventually
- * calls this method whose behavior can altered by setting {@link
#nodeSortType}.
+ * Scheduling uses {@link #sortAllNodes()} which eventually
+ * calls this method whose behavior can be altered by setting {@link
#nodeSortType}.
*
* @param resourcesSummary contains all individual {@link
ObjectResourcesItem} as well as cumulative stats
* @param exec executor for which the sorting is done
* @param existingScheduleFunc a function to get existing executors
already scheduled on this object
* @return a sorted list of {@link ObjectResourcesItem}
*/
- protected TreeSet<ObjectResourcesItem> sortObjectResources(
+ protected List<ObjectResourcesItem> sortObjectResources(
ObjectResourcesSummary resourcesSummary, ExecutorDetails exec,
ExistingScheduleFunc existingScheduleFunc) {
switch (nodeSortType) {
case DEFAULT_RAS:
@@ -180,7 +185,7 @@ public class NodeSorter implements INodeSorter {
* @param existingScheduleFunc a function to get existing executors
already scheduled on this object
* @return a sorted list of ObjectResources
*/
- private TreeSet<ObjectResourcesItem> sortObjectResourcesCommon(
+ private List<ObjectResourcesItem> sortObjectResourcesCommon(
final ObjectResourcesSummary allResources, final ExecutorDetails
exec,
final ExistingScheduleFunc existingScheduleFunc) {
// Copy and modify allResources
@@ -203,34 +208,31 @@ public class NodeSorter implements INodeSorter {
);
// Use the following comparator to return a sorted set
- TreeSet<ObjectResourcesItem> sortedObjectResources =
- new TreeSet<>((o1, o2) -> {
- int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
- int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
- if (execsScheduled1 > execsScheduled2) {
- return -1;
- } else if (execsScheduled1 < execsScheduled2) {
- return 1;
- } else {
- double o1Avg = o1.avgResourcePercent;
- double o2Avg = o2.avgResourcePercent;
-
- if (o1Avg > o2Avg) {
- return -1;
- } else if (o1Avg < o2Avg) {
- return 1;
- } else {
- if (o1.minResourcePercent > o2.minResourcePercent)
{
- return -1;
- } else if (o1.minResourcePercent <
o2.minResourcePercent) {
- return 1;
- } else {
- return o1.id.compareTo(o2.id);
- }
- }
- }
- });
+ List<ObjectResourcesItem> sortedObjectResources = new ArrayList();
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ double o1Avg = o1.avgResourcePercent;
+ double o2Avg = o2.avgResourcePercent;
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+ sortedObjectResources.sort(comparator);
LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
return sortedObjectResources;
}
@@ -258,38 +260,35 @@ public class NodeSorter implements INodeSorter {
* @return a sorted list of ObjectResources
*/
@Deprecated
- private TreeSet<ObjectResourcesItem> sortObjectResourcesGeneric(
+ private List<ObjectResourcesItem> sortObjectResourcesGeneric(
final ObjectResourcesSummary allResources, ExecutorDetails exec,
final ExistingScheduleFunc existingScheduleFunc) {
ObjectResourcesSummary affinityBasedAllResources = new
ObjectResourcesSummary(allResources);
NormalizedResourceRequest requestedResources =
topologyDetails.getTotalResources(exec);
- for (ObjectResourcesItem objectResources :
affinityBasedAllResources.getObjectResources()) {
-
objectResources.availableResources.updateForRareResourceAffinity(requestedResources);
- }
+ affinityBasedAllResources.getObjectResources()
+ .forEach(x ->
x.availableResources.updateForRareResourceAffinity(requestedResources));
final NormalizedResourceOffer availableResourcesOverall =
allResources.getAvailableResourcesOverall();
- TreeSet<ObjectResourcesItem> sortedObjectResources =
- new TreeSet<>((o1, o2) -> {
- int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
- int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
- if (execsScheduled1 > execsScheduled2) {
- return -1;
- } else if (execsScheduled1 < execsScheduled2) {
- return 1;
- } else {
- double o1Avg =
availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
- double o2Avg =
availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
-
- if (o1Avg > o2Avg) {
- return -1;
- } else if (o1Avg < o2Avg) {
- return 1;
- } else {
- return o1.id.compareTo(o2.id);
- }
- }
- });
+ List<ObjectResourcesItem> sortedObjectResources = new ArrayList<>();
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ double o1Avg =
availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+ double o2Avg =
availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+ sortedObjectResources.sort(comparator);
LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
return sortedObjectResources;
}
@@ -313,7 +312,7 @@ public class NodeSorter implements INodeSorter {
* @return a sorted list of ObjectResources
*/
@Deprecated
- private TreeSet<ObjectResourcesItem> sortObjectResourcesDefault(
+ private List<ObjectResourcesItem> sortObjectResourcesDefault(
final ObjectResourcesSummary allResources,
final ExistingScheduleFunc existingScheduleFunc) {
@@ -328,32 +327,30 @@ public class NodeSorter implements INodeSorter {
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
}
- TreeSet<ObjectResourcesItem> sortedObjectResources =
- new TreeSet<>((o1, o2) -> {
- int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
- int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
- if (execsScheduled1 > execsScheduled2) {
- return -1;
- } else if (execsScheduled1 < execsScheduled2) {
- return 1;
- } else {
- if (o1.minResourcePercent > o2.minResourcePercent) {
- return -1;
- } else if (o1.minResourcePercent <
o2.minResourcePercent) {
- return 1;
- } else {
- double diff = o1.avgResourcePercent -
o2.avgResourcePercent;
- if (diff > 0.0) {
- return -1;
- } else if (diff < 0.0) {
- return 1;
- } else {
- return o1.id.compareTo(o2.id);
- }
- }
- }
- });
+ List<ObjectResourcesItem> sortedObjectResources = new ArrayList<>();
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+ if (diff > 0.0) {
+ return -1;
+ } else if (diff < 0.0) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
sortedObjectResources.addAll(allResources.getObjectResources());
+ sortedObjectResources.sort(comparator);
LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
return sortedObjectResources;
}
@@ -375,7 +372,7 @@ public class NodeSorter implements INodeSorter {
* @param rackId the rack id availNodes are a part of
* @return a sorted list of nodes.
*/
- private TreeSet<ObjectResourcesItem> sortNodes(
+ private List<ObjectResourcesItem> sortNodes(
List<RasNode> availRasNodes, ExecutorDetails exec, String rackId,
Map<String, AtomicInteger> scheduledCount) {
ObjectResourcesSummary rackResourcesSummary = new
ObjectResourcesSummary("RACK");
@@ -428,7 +425,7 @@ public class NodeSorter implements INodeSorter {
private final Iterator<String> post;
private final Set<String> skip;
- LazyNodeSortingIterator(LazyNodeSorting parent,
TreeSet<ObjectResourcesItem> sortedRacks) {
+ LazyNodeSortingIterator(LazyNodeSorting parent,
List<ObjectResourcesItem> sortedRacks) {
this.parent = parent;
rackIterator = sortedRacks.iterator();
pre = favoredNodeIds.iterator();
@@ -495,8 +492,8 @@ public class NodeSorter implements INodeSorter {
private class LazyNodeSorting implements Iterable<String> {
private final Map<String, AtomicInteger> perNodeScheduledCount = new
HashMap<>();
- private final TreeSet<ObjectResourcesItem> sortedRacks;
- private final Map<String, TreeSet<ObjectResourcesItem>> cachedNodes =
new HashMap<>();
+ private final List<ObjectResourcesItem> sortedRacks;
+ private final Map<String, List<ObjectResourcesItem>> cachedNodes = new
HashMap<>();
private final ExecutorDetails exec;
private final Set<String> skippedNodeIds = new HashSet<>();
@@ -516,10 +513,10 @@ public class NodeSorter implements INodeSorter {
.getAndAdd(entry.getValue().size());
}
}
- sortedRacks = sortRacks(exec);
+ sortedRacks = getSortedRacks();
}
- private TreeSet<ObjectResourcesItem> getSortedNodesFor(String rackId) {
+ private List<ObjectResourcesItem> getSortedNodesFor(String rackId) {
return cachedNodes.computeIfAbsent(rackId,
(rid) -> sortNodes(rackIdToNodes.getOrDefault(rid,
Collections.emptyList()), exec, rid, perNodeScheduledCount));
}
@@ -531,7 +528,7 @@ public class NodeSorter implements INodeSorter {
}
@Override
- public Iterable<String> sortAllNodes(ExecutorDetails exec) {
+ public Iterable<String> sortAllNodes() {
return new LazyNodeSorting(exec);
}
@@ -590,8 +587,7 @@ public class NodeSorter implements INodeSorter {
*
* @return a sorted list of racks
*/
- @Override
- public TreeSet<ObjectResourcesItem> sortRacks(ExecutorDetails exec) {
+ public List<ObjectResourcesItem> getSortedRacks() {
final ObjectResourcesSummary clusterResourcesSummary =
createClusterSummarizedResources();
final Map<String, AtomicInteger> scheduledCount =
getScheduledExecCntByRackId();
diff --git
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
similarity index 66%
copy from
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
copy to
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
index 85a9015..64f9f08 100644
---
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java
+++
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
@@ -21,6 +21,7 @@ package
org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -32,7 +33,6 @@ import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
import org.apache.storm.Config;
import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.scheduler.Cluster;
@@ -47,11 +47,12 @@ import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceReque
import
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NodeSorter implements INodeSorter {
- private static final Logger LOG =
LoggerFactory.getLogger(NodeSorter.class);
+public class NodeSorterHostProximity implements INodeSorter {
+ private static final Logger LOG =
LoggerFactory.getLogger(NodeSorterHostProximity.class);
// instance variables from class instantiation
protected final BaseResourceAwareStrategy.NodeSortType nodeSortType;
@@ -60,56 +61,60 @@ public class NodeSorter implements INodeSorter {
protected TopologyDetails topologyDetails;
// Instance variables derived from Cluster.
- private final Map<String, List<String>> networkTopography;
private final Map<String, String> superIdToRack = new HashMap<>();
private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
- private final Map<String, List<RasNode>> rackIdToNodes = new HashMap<>();
+ private final Map<String, String> nodeIdToHostname = new HashMap<>();
+ private final Map<String, Set<String>> rackIdToHosts = new HashMap<>();
protected List<String> greyListedSupervisorIds;
// Instance variables from Cluster and TopologyDetails.
protected List<String> favoredNodeIds;
protected List<String> unFavoredNodeIds;
+ // Updated in prepare method
+ ExecutorDetails exec;
+
+ public NodeSorterHostProximity(Cluster cluster, TopologyDetails
topologyDetails) {
+ this(cluster, topologyDetails,
BaseResourceAwareStrategy.NodeSortType.COMMON);
+ }
+
/**
* Initialize for the default implementation node sorting.
*
* <p>
* <li>{@link BaseResourceAwareStrategy.NodeSortType#GENERIC_RAS} sorting
implemented in
- * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary,
ExecutorDetails, NodeSorter.ExistingScheduleFunc)}</li>
+ * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary,
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
* <li>{@link BaseResourceAwareStrategy.NodeSortType#DEFAULT_RAS} sorting
implemented in
- * {@link #sortObjectResourcesDefault(ObjectResourcesSummary,
NodeSorter.ExistingScheduleFunc)}</li>
+ * {@link #sortObjectResourcesDefault(ObjectResourcesSummary,
NodeSorterHostProximity.ExistingScheduleFunc)}</li>
* <li>{@link BaseResourceAwareStrategy.NodeSortType#COMMON} sorting
implemented in
- * {@link #sortObjectResourcesCommon(ObjectResourcesSummary,
ExecutorDetails, NodeSorter.ExistingScheduleFunc)}</li>
+ * {@link #sortObjectResourcesCommon(ObjectResourcesSummary,
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
* </p>
*
* @param cluster for which nodes will be sorted.
* @param topologyDetails the topology to sort for.
* @param nodeSortType type of sorting to be applied to object resource
collection {@link BaseResourceAwareStrategy.NodeSortType}.
*/
- public NodeSorter(Cluster cluster, TopologyDetails topologyDetails,
BaseResourceAwareStrategy.NodeSortType nodeSortType) {
+ public NodeSorterHostProximity(Cluster cluster, TopologyDetails
topologyDetails, BaseResourceAwareStrategy.NodeSortType nodeSortType) {
this.cluster = cluster;
this.topologyDetails = topologyDetails;
this.nodeSortType = nodeSortType;
// from Cluster
- networkTopography = cluster.getNetworkTopography();
- Map<String, String> hostToRack = new HashMap<>();
- for (Map.Entry<String, List<String>> entry :
networkTopography.entrySet()) {
- String rackId = entry.getKey();
- for (String hostName: entry.getValue()) {
- hostToRack.put(hostName, rackId);
- }
- }
+ greyListedSupervisorIds = cluster.getGreyListedSupervisors();
+ Map<String, String> hostToRack = cluster.getHostToRack();
RasNodes nodes = new RasNodes(cluster);
for (RasNode node: nodes.getNodes()) {
String superId = node.getId();
String hostName = node.getHostname();
+ if (!node.isAlive() || hostName == null) {
+ continue;
+ }
String rackId = hostToRack.getOrDefault(hostName,
DNSToSwitchMapping.DEFAULT_RACK);
superIdToRack.put(superId, rackId);
hostnameToNodes.computeIfAbsent(hostName, (hn) -> new
ArrayList<>()).add(node);
- rackIdToNodes.computeIfAbsent(rackId, (hn) -> new
ArrayList<>()).add(node);
+ nodeIdToHostname.put(superId, hostName);
+ rackIdToHosts.computeIfAbsent(rackId, r -> new
HashSet<>()).add(hostName);
}
- this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
// from TopologyDetails
Map<String, Object> topoConf = topologyDetails.getConf();
@@ -122,16 +127,26 @@ public class NodeSorter implements INodeSorter {
unFavoredNodeIds.removeAll(favoredNodeIds);
}
+ @VisibleForTesting
+ public Map<String, Set<String>> getRackIdToHosts() {
+ return rackIdToHosts;
+ }
+
+ @Override
+ public void prepare(ExecutorDetails exec) {
+ this.exec = exec;
+ }
+
/**
- * Scheduling uses {@link #sortAllNodes(ExecutorDetails)} which eventually
- * calls this method whose behavior can altered by setting {@link
#nodeSortType}.
+ * Scheduling uses {@link #sortAllNodes()} which eventually
+ * calls this method whose behavior can be altered by setting {@link
#nodeSortType}.
*
* @param resourcesSummary contains all individual {@link
ObjectResourcesItem} as well as cumulative stats
* @param exec executor for which the sorting is done
* @param existingScheduleFunc a function to get existing executors
already scheduled on this object
- * @return a sorted list of {@link ObjectResourcesItem}
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
*/
- protected TreeSet<ObjectResourcesItem> sortObjectResources(
+ protected Iterable<ObjectResourcesItem> sortObjectResources(
ObjectResourcesSummary resourcesSummary, ExecutorDetails exec,
ExistingScheduleFunc existingScheduleFunc) {
switch (nodeSortType) {
case DEFAULT_RAS:
@@ -178,9 +193,9 @@ public class NodeSorter implements INodeSorter {
* @param allResources contains all individual ObjectResources as
well as cumulative stats
* @param exec executor for which the sorting is done
* @param existingScheduleFunc a function to get existing executors
already scheduled on this object
- * @return a sorted list of ObjectResources
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
*/
- private TreeSet<ObjectResourcesItem> sortObjectResourcesCommon(
+ private Iterable<ObjectResourcesItem> sortObjectResourcesCommon(
final ObjectResourcesSummary allResources, final ExecutorDetails
exec,
final ExistingScheduleFunc existingScheduleFunc) {
// Copy and modify allResources
@@ -189,11 +204,11 @@ public class NodeSorter implements INodeSorter {
final NormalizedResourceRequest requestedResources = (exec != null) ?
topologyDetails.getTotalResources(exec) : null;
affinityBasedAllResources.getObjectResources().forEach(
x -> {
- x.minResourcePercent =
availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
if (requestedResources != null) {
// negate unrequested resources
x.availableResources.updateForRareResourceAffinity(requestedResources);
}
+ x.minResourcePercent =
availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
x.avgResourcePercent =
availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
LOG.trace("for {}: minResourcePercent={},
avgResourcePercent={}, numExistingSchedule={}",
@@ -202,34 +217,30 @@ public class NodeSorter implements INodeSorter {
}
);
- // Use the following comparator to return a sorted set
- TreeSet<ObjectResourcesItem> sortedObjectResources =
- new TreeSet<>((o1, o2) -> {
- int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
- int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
- if (execsScheduled1 > execsScheduled2) {
- return -1;
- } else if (execsScheduled1 < execsScheduled2) {
- return 1;
- } else {
- double o1Avg = o1.avgResourcePercent;
- double o2Avg = o2.avgResourcePercent;
-
- if (o1Avg > o2Avg) {
- return -1;
- } else if (o1Avg < o2Avg) {
- return 1;
- } else {
- if (o1.minResourcePercent > o2.minResourcePercent)
{
- return -1;
- } else if (o1.minResourcePercent <
o2.minResourcePercent) {
- return 1;
- } else {
- return o1.id.compareTo(o2.id);
- }
- }
- }
- });
+ // Use the following comparator to sort
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ double o1Avg = o1.avgResourcePercent;
+ double o2Avg = o2.avgResourcePercent;
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new
TreeSet(comparator);
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
return sortedObjectResources;
@@ -255,40 +266,48 @@ public class NodeSorter implements INodeSorter {
* @param allResources contains all individual ObjectResources as
well as cumulative stats
* @param exec executor for which the sorting is done
* @param existingScheduleFunc a function to get existing executors
already scheduled on this object
- * @return a sorted list of ObjectResources
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
*/
@Deprecated
- private TreeSet<ObjectResourcesItem> sortObjectResourcesGeneric(
+ private Iterable<ObjectResourcesItem> sortObjectResourcesGeneric(
final ObjectResourcesSummary allResources, ExecutorDetails exec,
final ExistingScheduleFunc existingScheduleFunc) {
ObjectResourcesSummary affinityBasedAllResources = new
ObjectResourcesSummary(allResources);
- NormalizedResourceRequest requestedResources =
topologyDetails.getTotalResources(exec);
- for (ObjectResourcesItem objectResources :
affinityBasedAllResources.getObjectResources()) {
-
objectResources.availableResources.updateForRareResourceAffinity(requestedResources);
- }
final NormalizedResourceOffer availableResourcesOverall =
allResources.getAvailableResourcesOverall();
+ final NormalizedResourceRequest requestedResources = (exec != null) ?
topologyDetails.getTotalResources(exec) : null;
+ affinityBasedAllResources.getObjectResources().forEach(
+ x -> {
+ if (requestedResources != null) {
+ // negate unrequested resources
+
x.availableResources.updateForRareResourceAffinity(requestedResources);
+ }
+ x.minResourcePercent =
availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
+ x.avgResourcePercent =
availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
- TreeSet<ObjectResourcesItem> sortedObjectResources =
- new TreeSet<>((o1, o2) -> {
- int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
- int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
- if (execsScheduled1 > execsScheduled2) {
- return -1;
- } else if (execsScheduled1 < execsScheduled2) {
- return 1;
- } else {
- double o1Avg =
availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
- double o2Avg =
availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
-
- if (o1Avg > o2Avg) {
- return -1;
- } else if (o1Avg < o2Avg) {
- return 1;
- } else {
- return o1.id.compareTo(o2.id);
- }
- }
- });
+ LOG.trace("for {}: minResourcePercent={},
avgResourcePercent={}, numExistingSchedule={}",
+ x.id, x.minResourcePercent, x.avgResourcePercent,
+ existingScheduleFunc.getNumExistingSchedule(x.id));
+ }
+ );
+
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ double o1Avg = o1.avgResourcePercent;
+ double o2Avg = o2.avgResourcePercent;
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new
TreeSet<>(comparator);
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
return sortedObjectResources;
@@ -310,10 +329,10 @@ public class NodeSorter implements INodeSorter {
*
* @param allResources contains all individual ObjectResources as
well as cumulative stats
* @param existingScheduleFunc a function to get existing executors
already scheduled on this object
- * @return a sorted list of ObjectResources
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
*/
@Deprecated
- private TreeSet<ObjectResourcesItem> sortObjectResourcesDefault(
+ private Iterable<ObjectResourcesItem> sortObjectResourcesDefault(
final ObjectResourcesSummary allResources,
final ExistingScheduleFunc existingScheduleFunc) {
@@ -328,31 +347,28 @@ public class NodeSorter implements INodeSorter {
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
}
- TreeSet<ObjectResourcesItem> sortedObjectResources =
- new TreeSet<>((o1, o2) -> {
- int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
- int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
- if (execsScheduled1 > execsScheduled2) {
- return -1;
- } else if (execsScheduled1 < execsScheduled2) {
- return 1;
- } else {
- if (o1.minResourcePercent > o2.minResourcePercent) {
- return -1;
- } else if (o1.minResourcePercent <
o2.minResourcePercent) {
- return 1;
- } else {
- double diff = o1.avgResourcePercent -
o2.avgResourcePercent;
- if (diff > 0.0) {
- return -1;
- } else if (diff < 0.0) {
- return 1;
- } else {
- return o1.id.compareTo(o2.id);
- }
- }
- }
- });
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+ if (diff > 0.0) {
+ return -1;
+ } else if (diff < 0.0) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new
TreeSet<>(comparator);
sortedObjectResources.addAll(allResources.getObjectResources());
LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
return sortedObjectResources;
@@ -371,28 +387,75 @@ public class NodeSorter implements INodeSorter {
* calculation, nodes nodes that have more balanced resource availability.
So we will be less likely to pick a node that have a lot of
* one resource but a low amount of another.
*
- * @param availRasNodes a list of all the nodes we want to sort
+ * @param availHosts a collection of all the hosts we want to sort
* @param rackId the rack id availNodes are a part of
- * @return a sorted list of nodes.
+ * @return an iterable of sorted hosts.
*/
- private TreeSet<ObjectResourcesItem> sortNodes(
- List<RasNode> availRasNodes, ExecutorDetails exec, String rackId,
+ private Iterable<ObjectResourcesItem> sortHosts(
+ Collection<String> availHosts, ExecutorDetails exec, String rackId,
Map<String, AtomicInteger> scheduledCount) {
ObjectResourcesSummary rackResourcesSummary = new
ObjectResourcesSummary("RACK");
+ availHosts.forEach(h -> {
+ ObjectResourcesItem hostItem = new ObjectResourcesItem(h);
+ for (RasNode x : hostnameToNodes.get(h)) {
+ hostItem.add(new ObjectResourcesItem(x.getId(),
x.getTotalAvailableResources(), x.getTotalResources(), 0, 0));
+ }
+ rackResourcesSummary.addObjectResourcesItem(hostItem);
+ });
+
+ LOG.debug(
+ "Rack {}: Overall Avail [ {} ] Total [ {} ]",
+ rackId,
+ rackResourcesSummary.getAvailableResourcesOverall(),
+ rackResourcesSummary.getTotalResourcesOverall());
+
+ return sortObjectResources(
+ rackResourcesSummary,
+ exec,
+ (hostId) -> {
+ AtomicInteger count = scheduledCount.get(hostId);
+ if (count == null) {
+ return 0;
+ }
+ return count.get();
+ });
+ }
+
+ /**
+ * Nodes are sorted by two criteria.
+ *
+ * <p>1) the number executors of the topology that needs to be scheduled
is already on the node in
+ * descending order. The reasoning to sort based on criterion 1 is so we
schedule the rest of a topology on the same node as the
+ * existing executors of the topology.
+ *
+ * <p>2) the subordinate/subservient resource availability percentage of a
node in descending
+ * order We calculate the resource availability percentage by dividing the
resource availability that have exhausted or little of one of
+ * the resources mentioned above will be ranked after on the node by the
resource availability of the entire rack By doing this
+ * calculation, nodes nodes that have more balanced resource availability.
So we will be less likely to pick a node that have a lot of
+ * one resource but a low amount of another.
+ *
+ * @param availRasNodes a list of all the nodes we want to sort
+ * @param hostId the host-id that availNodes are a part of
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem} for
nodes.
+ */
+ private Iterable<ObjectResourcesItem> sortNodes(
+ List<RasNode> availRasNodes, ExecutorDetails exec, String hostId,
+ Map<String, AtomicInteger> scheduledCount) {
+ ObjectResourcesSummary hostResourcesSummary = new
ObjectResourcesSummary("HOST");
availRasNodes.forEach(x ->
- rackResourcesSummary.addObjectResourcesItem(
+ hostResourcesSummary.addObjectResourcesItem(
new ObjectResourcesItem(x.getId(),
x.getTotalAvailableResources(), x.getTotalResources(), 0, 0)
)
);
LOG.debug(
- "Rack {}: Overall Avail [ {} ] Total [ {} ]",
- rackId,
- rackResourcesSummary.getAvailableResourcesOverall(),
- rackResourcesSummary.getTotalResourcesOverall());
+ "Host {}: Overall Avail [ {} ] Total [ {} ]",
+ hostId,
+ hostResourcesSummary.getAvailableResourcesOverall(),
+ hostResourcesSummary.getTotalResourcesOverall());
return sortObjectResources(
- rackResourcesSummary,
+ hostResourcesSummary,
exec,
(superId) -> {
AtomicInteger count = scheduledCount.get(superId);
@@ -422,13 +485,14 @@ public class NodeSorter implements INodeSorter {
private class LazyNodeSortingIterator implements Iterator<String> {
private final LazyNodeSorting parent;
private final Iterator<ObjectResourcesItem> rackIterator;
+ private Iterator<ObjectResourcesItem> hostIterator;
private Iterator<ObjectResourcesItem> nodeIterator;
private String nextValueFromNode = null;
private final Iterator<String> pre;
private final Iterator<String> post;
private final Set<String> skip;
- LazyNodeSortingIterator(LazyNodeSorting parent,
TreeSet<ObjectResourcesItem> sortedRacks) {
+ LazyNodeSortingIterator(LazyNodeSorting parent,
Iterable<ObjectResourcesItem> sortedRacks) {
this.parent = parent;
rackIterator = sortedRacks.iterator();
pre = favoredNodeIds.iterator();
@@ -442,11 +506,20 @@ public class NodeSorter implements INodeSorter {
if (nodeIterator != null && nodeIterator.hasNext()) {
return nodeIterator;
}
- //need to get the next node iterator
+ //need to get the next host/node iterator
+ if (hostIterator != null && hostIterator.hasNext()) {
+ ObjectResourcesItem host = hostIterator.next();
+ final String hostId = host.id;
+ nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
+ return nodeIterator;
+ }
if (rackIterator.hasNext()) {
ObjectResourcesItem rack = rackIterator.next();
final String rackId = rack.id;
- nodeIterator = parent.getSortedNodesFor(rackId).iterator();
+ hostIterator = parent.getSortedHostsForRack(rackId).iterator();
+ ObjectResourcesItem host = hostIterator.next();
+ final String hostId = host.id;
+ nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
return nodeIterator;
}
@@ -494,9 +567,11 @@ public class NodeSorter implements INodeSorter {
}
private class LazyNodeSorting implements Iterable<String> {
+ private final Map<String, AtomicInteger> perHostScheduledCount = new
HashMap<>();
private final Map<String, AtomicInteger> perNodeScheduledCount = new
HashMap<>();
- private final TreeSet<ObjectResourcesItem> sortedRacks;
- private final Map<String, TreeSet<ObjectResourcesItem>> cachedNodes =
new HashMap<>();
+ private final Iterable<ObjectResourcesItem> sortedRacks;
+ private final Map<String, Iterable<ObjectResourcesItem>> cachedHosts =
new HashMap<>();
+ private final Map<String, Iterable<ObjectResourcesItem>>
cachedNodesByHost = new HashMap<>();
private final ExecutorDetails exec;
private final Set<String> skippedNodeIds = new HashSet<>();
@@ -512,16 +587,24 @@ public class NodeSorter implements INodeSorter {
for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
assignment.getSlotToExecutors().entrySet()) {
String superId = entry.getKey().getNodeId();
- perNodeScheduledCount.computeIfAbsent(superId, (sid) ->
new AtomicInteger(0))
+ String hostId = nodeIdToHostname.get(superId);
+ perHostScheduledCount.computeIfAbsent(hostId, id -> new
AtomicInteger(0))
+ .getAndAdd(entry.getValue().size());
+ perNodeScheduledCount.computeIfAbsent(superId, id -> new
AtomicInteger(0))
.getAndAdd(entry.getValue().size());
}
}
- sortedRacks = sortRacks(exec);
+ sortedRacks = getSortedRacks();
+ }
+
+ private Iterable<ObjectResourcesItem> getSortedHostsForRack(String
rackId) {
+ return cachedHosts.computeIfAbsent(rackId,
+ id -> sortHosts(rackIdToHosts.getOrDefault(id,
Collections.emptySet()), exec, id, perHostScheduledCount));
}
- private TreeSet<ObjectResourcesItem> getSortedNodesFor(String rackId) {
- return cachedNodes.computeIfAbsent(rackId,
- (rid) -> sortNodes(rackIdToNodes.getOrDefault(rid,
Collections.emptyList()), exec, rid, perNodeScheduledCount));
+ private Iterable<ObjectResourcesItem> getSortedNodesForHost(String
hostId) {
+ return cachedNodesByHost.computeIfAbsent(hostId,
+ id -> sortNodes(hostnameToNodes.getOrDefault(id,
Collections.emptyList()), exec, id, perNodeScheduledCount));
}
@Override
@@ -531,37 +614,38 @@ public class NodeSorter implements INodeSorter {
}
@Override
- public Iterable<String> sortAllNodes(ExecutorDetails exec) {
+ public Iterable<String> sortAllNodes() {
return new LazyNodeSorting(exec);
}
private ObjectResourcesSummary createClusterSummarizedResources() {
ObjectResourcesSummary clusterResourcesSummary = new
ObjectResourcesSummary("Cluster");
-
- //This is the first time so initialize the resources.
- for (Map.Entry<String, List<String>> entry :
networkTopography.entrySet()) {
- String rackId = entry.getKey();
- List<String> nodeHosts = entry.getValue();
- ObjectResourcesItem rack = new ObjectResourcesItem(rackId);
- for (String nodeHost : nodeHosts) {
- for (RasNode node : hostnameToNodes(nodeHost)) {
-
rack.availableResources.add(node.getTotalAvailableResources());
- rack.totalResources.add(node.getTotalAvailableResources());
+ rackIdToHosts.forEach((rackId, hostIds) -> {
+ if (hostIds == null || hostIds.isEmpty()) {
+ LOG.info("Ignoring Rack {} since it has no hosts", rackId);
+ } else {
+ ObjectResourcesItem rack = new ObjectResourcesItem(rackId);
+ for (String hostId : hostIds) {
+ for (RasNode node : hostnameToNodes(hostId)) {
+
rack.availableResources.add(node.getTotalAvailableResources());
+ rack.totalResources.add(node.getTotalResources());
+ }
}
+ clusterResourcesSummary.addObjectResourcesItem(rack);
}
- clusterResourcesSummary.addObjectResourcesItem(rack);
- }
+ });
LOG.debug(
- "Cluster Overall Avail [ {} ] Total [ {} ]",
+ "Cluster Overall Avail [ {} ] Total [ {} ], rackCnt={},
hostCnt={}",
clusterResourcesSummary.getAvailableResourcesOverall(),
- clusterResourcesSummary.getTotalResourcesOverall());
+ clusterResourcesSummary.getTotalResourcesOverall(),
+ clusterResourcesSummary.getObjectResources().size(),
+ rackIdToHosts.values().stream().mapToInt(x -> x.size()).sum());
return clusterResourcesSummary;
}
- private Map<String, AtomicInteger> getScheduledExecCntByRackId() {
- String topoId = topologyDetails.getId();
- SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
+ public Map<String, AtomicInteger> getScheduledExecCntByRackId() {
+ SchedulerAssignment assignment =
cluster.getAssignmentById(topologyDetails.getId());
Map<String, AtomicInteger> scheduledCount = new HashMap<>();
if (assignment != null) {
for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
@@ -588,10 +672,9 @@ public class NodeSorter implements INodeSorter {
* racks that have more balanced resource availability. So we will be less
likely to pick a rack that have a lot of one resource but a
* low amount of another.
*
- * @return a sorted list of racks
+ * @return an iterable of sorted racks
*/
- @Override
- public TreeSet<ObjectResourcesItem> sortRacks(ExecutorDetails exec) {
+ public Iterable<ObjectResourcesItem> getSortedRacks() {
final ObjectResourcesSummary clusterResourcesSummary =
createClusterSummarizedResources();
final Map<String, AtomicInteger> scheduledCount =
getScheduledExecCntByRackId();
diff --git
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 11de468..dcffa99 100644
---
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -189,13 +189,34 @@ public class TestUtilsForResourceAwareScheduler {
}
}
- public static Map<String, SupervisorDetails> genSupervisorsWithRacks(int
numRacks, int numSupersPerRack, int numPorts, int rackStart,
- int
superInRackStart, double cpu, double mem,
-
Map<String, Double> miscResources) {
- Map<String, Double> resourceMap = new HashMap<>();
- resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
- resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
- resourceMap.putAll(miscResources);
+ public static Map<String, SupervisorDetails> genSupervisorsWithRacks(
+ int numRacks, int numSupersPerRack, int numPorts, int rackStart,
int superInRackStart,
+ double cpu, double mem, Map<String, Double> miscResources) {
+
+ return genSupervisorsWithRacksAndNuma(numRacks, numSupersPerRack, 1,
numPorts, rackStart,
+ superInRackStart, cpu, mem, miscResources, 1.0);
+ }
+
+ /**
+ * Takes one additional parameter numaZonesPerHost. This parameter
determines how many supervisors
+ * will be created on the same host. If numaResourceMultiplier is set to a
factor below 1.0, then
+ * each subsequent numa zone will have corresponding lower cpu/mem than
previous numa zone.
+ *
+ * @param numRacks
+ * @param numSupersPerRack
+ * @param numaZonesPerHost
+ * @param numPorts
+ * @param rackStart
+ * @param superInRackStart
+ * @param cpu
+ * @param mem
+ * @param miscResources
+ * @param numaResourceMultiplier - cpu/mem resource for each numaZone is
multiplied by this factor to obtain uneven resources
+ * @return
+ */
+ public static Map<String, SupervisorDetails>
genSupervisorsWithRacksAndNuma(
+ int numRacks, int numSupersPerRack, int numaZonesPerHost, int
numPorts, int rackStart, int superInRackStart,
+ double cpu, double mem, Map<String, Double> miscResources, double
numaResourceMultiplier) {
Map<String, SupervisorDetails> retList = new HashMap<>();
for (int rack = rackStart; rack < numRacks + rackStart; rack++) {
for (int superInRack = superInRackStart; superInRack <
(numSupersPerRack + superInRackStart); superInRack++) {
@@ -203,8 +224,23 @@ public class TestUtilsForResourceAwareScheduler {
for (int p = 0; p < numPorts; p++) {
ports.add(p);
}
- SupervisorDetails sup = new
SupervisorDetails(String.format("r%03ds%03d", rack, superInRack),
- String.format("host-%03d-rack-%03d", superInRack, rack),
null, ports,
+ String superId;
+ String host;
+ int numaZone = superInRack % numaZonesPerHost;
+ if (numaZonesPerHost > 1) {
+ // multiple supervisors per host
+ int hostInRack = superInRack / numaZonesPerHost;
+ superId = String.format("r%03ds%03dn%d", rack,
superInRack, numaZone);
+ host = String.format("host-%03d-rack-%03d", hostInRack,
rack);
+ } else {
+ superId = String.format("r%03ds%03d", rack, superInRack);
+ host = String.format("host-%03d-rack-%03d", superInRack,
rack);
+ }
+ Map<String, Double> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu *
Math.pow(numaResourceMultiplier, numaZone));
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem *
Math.pow(numaResourceMultiplier, numaZone));
+ resourceMap.putAll(miscResources);
+ SupervisorDetails sup = new SupervisorDetails(superId, host,
null, ports,
NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
retList.put(sup.getId(), sup);
diff --git
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index 96a0f9a..fce3065 100644
---
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -22,6 +22,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.config.Configurator;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
@@ -36,6 +39,7 @@ import
org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.SchedulingResult;
import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSorterByConstraintSeverity;
import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.json.simple.JSONValue;
@@ -77,6 +81,12 @@ public class TestConstraintSolverStrategy {
public TestConstraintSolverStrategy(boolean consolidatedConfigFlag) {
this.consolidatedConfigFlag = consolidatedConfigFlag;
+ List<Class> classesToDebug =
Arrays.asList(TestConstraintSolverStrategy.class,
+ BaseResourceAwareStrategy.class, ResourceAwareScheduler.class,
+ NodeSorterHostProximity.class, Cluster.class
+ );
+ Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose
otherwise Level.INFO
+ classesToDebug.forEach(x -> Configurator.setLevel(x.getName(),
logLevel));
LOG.info("Running tests with consolidatedConfigFlag={}",
consolidatedConfigFlag);
}
@@ -180,15 +190,15 @@ public class TestConstraintSolverStrategy {
return makeTestTopoConf(1);
}
- public TopologyDetails makeTopology(Map<String, Object> config, int
boltParallel) {
+ public static TopologyDetails makeTopology(Map<String, Object> config, int
boltParallel) {
return genTopology("testTopo", config, 1, 4, 4, boltParallel, 0, 0,
"user");
}
- public Cluster makeCluster(Topologies topologies) {
+ public static Cluster makeCluster(Topologies topologies) {
return makeCluster(topologies, null);
}
- public Cluster makeCluster(Topologies topologies, Map<String,
SupervisorDetails> supMap) {
+ public static Cluster makeCluster(Topologies topologies, Map<String,
SupervisorDetails> supMap) {
if (supMap == null) {
supMap = genSupervisors(4, 2, 120, 1200);
}
diff --git
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 1b1c035..b6f2a20 100644
---
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -44,6 +44,7 @@ import
org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.SchedulingResult;
import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorter;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
import org.apache.storm.topology.SharedOffHeapWithinNode;
import org.apache.storm.topology.SharedOffHeapWithinWorker;
import org.apache.storm.topology.SharedOnHeap;
@@ -74,7 +75,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
+
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
@@ -373,6 +375,15 @@ public class TestDefaultResourceAwareStrategy {
SchedulerAssignment assignment =
cluster.getAssignmentById(topo.getId());
TopologyResources topologyResources =
cluster.getTopologyResourcesMap().get(topo.getId());
long numNodes =
assignment.getSlotToExecutors().keySet().stream().map(WorkerSlot::getNodeId).distinct().count();
+ String assignmentString = "Assignments:\n\t" +
assignment.getSlotToExecutors().entrySet().stream()
+ .map(x -> String.format("Node=%s, components=%s",
+ x.getKey().getNodeId(),
+ x.getValue().stream()
+ .map(y -> topo.getComponentFromExecutor(y))
+ .collect(Collectors.joining(","))
+ )
+ )
+ .collect(Collectors.joining("\n\t"));
if (schedulingLimitation ==
WorkerRestrictionType.WORKER_RESTRICTION_NONE) {
// Everything should fit in a single slot
@@ -411,7 +422,7 @@ public class TestDefaultResourceAwareStrategy {
int numAssignedWorkers = cluster.getAssignedNumWorkers(topo);
assertThat(numAssignedWorkers, is(8));
assertThat(assignment.getSlots().size(), is(8));
- assertThat(numNodes, is(2L));
+ assertThat(assignmentString, numNodes, is(2L));
} else if (schedulingLimitation ==
WorkerRestrictionType.WORKER_RESTRICTION_ONE_COMPONENT) {
double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) +
sharedOnHeapWithinWorker;
double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) +
sharedOffHeapWithinWorker + sharedOffHeapWithinNode;
@@ -766,22 +777,17 @@ public class TestDefaultResourceAwareStrategy {
for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet())
{
String hostName = entry.getKey();
String rack = entry.getValue();
- List<String> nodesForRack = rackToNodes.get(rack);
- if (nodesForRack == null) {
- nodesForRack = new ArrayList<>();
- rackToNodes.put(rack, nodesForRack);
- }
- nodesForRack.add(hostName);
+ rackToNodes.computeIfAbsent(rack, rid -> new
ArrayList<>()).add(hostName);
}
cluster.setNetworkTopography(rackToNodes);
DefaultResourceAwareStrategyOld rs = new
DefaultResourceAwareStrategyOld();
rs.prepareForScheduling(cluster, topo1);
- INodeSorter nodeSorter = new NodeSorter(cluster, topo1,
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
- TreeSet<ObjectResourcesItem> sortedRacks = nodeSorter.sortRacks(null);
+ INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1,
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+ nodeSorter.prepare(null);
+ Iterable<ObjectResourcesItem> sortedRacks =
nodeSorter.getSortedRacks();
- Assert.assertEquals("# of racks sorted", 6, sortedRacks.size());
Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
// Ranked first since rack-0 has the most balanced set of resources
Assert.assertEquals("rack-0 should be ordered first", "rack-0",
it.next().id);
@@ -904,10 +910,10 @@ public class TestDefaultResourceAwareStrategy {
DefaultResourceAwareStrategyOld rs = new
DefaultResourceAwareStrategyOld();
rs.prepareForScheduling(cluster, topo1);
- INodeSorter nodeSorter = new NodeSorter(cluster, topo1,
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
- TreeSet<ObjectResourcesItem> sortedRacks= nodeSorter.sortRacks(null);
+ INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1,
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+ nodeSorter.prepare(null);
+ Iterable<ObjectResourcesItem> sortedRacks= nodeSorter.getSortedRacks();
- Assert.assertEquals("# of racks sorted", 5, sortedRacks.size());
Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
// Ranked first since rack-0 has the most balanced set of resources
Assert.assertEquals("rack-0 should be ordered first", "rack-0",
it.next().id);
diff --git
a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java
new file mode 100644
index 0000000..84b839f
--- /dev/null
+++
b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/TestNodeSorterHostProximity.java
@@ -0,0 +1,1036 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNodes;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import
org.apache.storm.scheduler.resource.normalization.NormalizedResourcesExtension;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static
org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@ExtendWith({NormalizedResourcesExtension.class})
+public class TestNodeSorterHostProximity {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestNodeSorterHostProximity.class);
+ private static final int CURRENT_TIME = 1450418597;
+
+ protected Class getDefaultResourceAwareStrategyClass() {
+ return DefaultResourceAwareStrategy.class;
+ }
+
+ private Config createClusterConfig(double compPcore, double compOnHeap,
double compOffHeap,
+ Map<String, Map<String, Number>> pools)
{
+ Config config =
TestUtilsForResourceAwareScheduler.createClusterConfig(compPcore, compOnHeap,
compOffHeap, pools);
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY,
getDefaultResourceAwareStrategyClass().getName());
+ return config;
+ }
+
+ private static class TestDNSToSwitchMapping implements DNSToSwitchMapping {
+ private final Map<String, String> hostToRackMap;
+ private final Map<String, List<String>> rackToHosts;
+
+ @SafeVarargs
+ public TestDNSToSwitchMapping(Map<String, SupervisorDetails>... racks)
{
+ Set<String> seenHosts = new HashSet<>();
+ Map<String, String> hostToRackMap = new HashMap<>();
+ Map<String, List<String>> rackToHosts = new HashMap<>();
+ for (int rackNum = 0; rackNum < racks.length; rackNum++) {
+ String rack = String.format("rack-%03d", rackNum);
+ for (SupervisorDetails sup : racks[rackNum].values()) {
+ hostToRackMap.put(sup.getHost(), rack);
+ String host = sup.getHost();
+ if (!seenHosts.contains(host)) {
+ rackToHosts.computeIfAbsent(rack, rid -> new
ArrayList<>()).add(host);
+ seenHosts.add(host);
+ }
+ }
+ }
+ this.hostToRackMap = Collections.unmodifiableMap(hostToRackMap);
+ this.rackToHosts = Collections.unmodifiableMap(rackToHosts);
+ }
+
+ /**
+ * Use the "rack-%03d" embedded in the name of the supervisor to
determine the rack number.
+ *
+ * @param supervisorDetailsCollection
+ */
+ public TestDNSToSwitchMapping(Collection<SupervisorDetails>
supervisorDetailsCollection) {
+ Set<String> seenHosts = new HashSet<>();
+ Map<String, String> hostToRackMap = new HashMap<>();
+ Map<String, List<String>> rackToHosts = new HashMap<>();
+
+ for (SupervisorDetails supervisorDetails:
supervisorDetailsCollection) {
+ String rackId =
supervisorIdToRackName(supervisorDetails.getId());
+ hostToRackMap.put(supervisorDetails.getHost(), rackId);
+ String host = supervisorDetails.getHost();
+ if (!seenHosts.contains(host)) {
+ rackToHosts.computeIfAbsent(rackId, rid -> new
ArrayList<>()).add(host);
+ seenHosts.add(host);
+ }
+ }
+ this.hostToRackMap = Collections.unmodifiableMap(hostToRackMap);
+ this.rackToHosts = Collections.unmodifiableMap(rackToHosts);
+ }
+
+ @Override
+ public Map<String, String> resolve(List<String> names) {
+ return hostToRackMap;
+ }
+
+ public Map<String, List<String>> getRackToHosts() {
+ return rackToHosts;
+ }
+ }
+
+ /**
+ * Test whether strategy will choose correct rack.
+ */
+ @Test
+ public void testMultipleRacks() {
+ final Map<String, SupervisorDetails> supMap = new HashMap<>();
+ final int numRacks = 1;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 4;
+ final int numZonesPerHost = 1;
+ final double numaResourceMultiplier = 1.0;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+
+ final Map<String, SupervisorDetails> supMapRack0 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 400, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate another rack of supervisors with less resources
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack1 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 200, 4000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate some supervisors that are depleted of one resource
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack2 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 0, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate some that has a lot of memory but little of cpu
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack3 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 10, 8000 * 2 + 4000,
Collections.emptyMap(),numaResourceMultiplier);
+
+ //generate some that has a lot of cpu but little of memory
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack4 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 400 + 200 + 10, 1000, Collections.emptyMap(),
numaResourceMultiplier);
+
+ //Generate some that have neither resource, to verify that the
strategy will prioritize this last
+ //Also put a generic resource with 0 value in the resources list, to
verify that it doesn't affect the sorting
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack5 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 0.0, 0.0, Collections.singletonMap("gpu.count", 0.0),
numaResourceMultiplier);
+
+ supMap.putAll(supMapRack0);
+ supMap.putAll(supMapRack1);
+ supMap.putAll(supMapRack2);
+ supMap.putAll(supMapRack3);
+ supMap.putAll(supMapRack4);
+ supMap.putAll(supMapRack5);
+
+ Config config = createClusterConfig(100, 500, 500, null);
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ INimbus iNimbus = new INimbusTest();
+
+ //create test DNSToSwitchMapping plugin
+ TestDNSToSwitchMapping testDNSToSwitchMapping =
+ new TestDNSToSwitchMapping(supMapRack0, supMapRack1,
supMapRack2, supMapRack3, supMapRack4, supMapRack5);
+
+ //generate topologies
+ TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0,
CURRENT_TIME - 2, 10, "user");
+ TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0,
CURRENT_TIME - 2, 10, "user");
+
+ Topologies topologies = new Topologies(topo1, topo2);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+ List<String> supHostnames = new LinkedList<>();
+ for (SupervisorDetails sup : supMap.values()) {
+ supHostnames.add(sup.getHost());
+ }
+ Map<String, List<String>> rackToHosts =
testDNSToSwitchMapping.getRackToHosts();
+ cluster.setNetworkTopography(rackToHosts);
+
+ NodeSorterHostProximity nodeSorter = new
NodeSorterHostProximity(cluster, topo1,
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+ nodeSorter.prepare(null);
+ List<ObjectResourcesItem> sortedRacks =
StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+ .collect(Collectors.toList());
+ String rackSummaries = sortedRacks.stream()
+ .map(x -> String.format("Rack %s -> scheduled-cnt %d,
min-avail %f, avg-avail %f, cpu %f, mem %f",
+ x.id,
nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new
AtomicInteger(-1)).get(),
+ x.minResourcePercent, x.avgResourcePercent,
+ x.availableResources.getTotalCpu(),
+ x.availableResources.getTotalMemoryMb()))
+ .collect(Collectors.joining("\n\t"));
+ Assert.assertEquals(rackSummaries + "\n# of racks sorted", 6,
sortedRacks.size());
+ Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+ Assert.assertEquals(rackSummaries + "\nrack-000 should be ordered
first since it has the most balanced set of resources", "rack-000",
it.next().id);
+ Assert.assertEquals(rackSummaries + "\nrack-001 should be ordered
second since it has a balanced set of resources but less than rack-000",
"rack-001", it.next().id);
+ Assert.assertEquals(rackSummaries + "\nrack-004 should be ordered
third since it has a lot of cpu but not a lot of memory", "rack-004",
it.next().id);
+ Assert.assertEquals(rackSummaries + "\nrack-003 should be ordered
fourth since it has a lot of memory but not cpu", "rack-003", it.next().id);
+ Assert.assertEquals(rackSummaries + "\nrack-002 should be ordered
fifth since it has not cpu resources", "rack-002", it.next().id);
+ Assert.assertEquals(rackSummaries + "\nRack-005 should be ordered
sixth since it has neither CPU nor memory available", "rack-005", it.next().id);
+ }
+
+ /**
+ * Test whether strategy will choose correct rack.
+ */
+ @Test
+ public void testMultipleRacksWithFavoritism() {
+ final Map<String, SupervisorDetails> supMap = new HashMap<>();
+ final int numRacks = 1;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 4;
+ final int numZonesPerHost = 2;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+ final Map<String, SupervisorDetails> supMapRack0 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 400, 8000, Collections.emptyMap(), 1.0);
+
+ //generate another rack of supervisors with less resources
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack1 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 200, 4000, Collections.emptyMap(), 1.0);
+
+ //generate some supervisors that are depleted of one resource
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack2 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 0, 8000, Collections.emptyMap(), 1.0);
+
+ //generate some that has a lot of memory but little of cpu
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack3 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 10, 8000 * 2 + 4000, Collections.emptyMap(), 1.0);
+
+ //generate some that has a lot of cpu but little of memory
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack4 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 400 + 200 + 10, 1000, Collections.emptyMap(), 1.0);
+
+ supMap.putAll(supMapRack0);
+ supMap.putAll(supMapRack1);
+ supMap.putAll(supMapRack2);
+ supMap.putAll(supMapRack3);
+ supMap.putAll(supMapRack4);
+
+ Config config = createClusterConfig(100, 500, 500, null);
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ INimbus iNimbus = new INimbusTest();
+
+ //create test DNSToSwitchMapping plugin
+ TestDNSToSwitchMapping testDNSToSwitchMapping =
+ new TestDNSToSwitchMapping(supMapRack0, supMapRack1,
supMapRack2, supMapRack3, supMapRack4);
+
+ Config t1Conf = new Config();
+ t1Conf.putAll(config);
+ final List<String> t1FavoredHostNames = Arrays.asList("host-41",
"host-42", "host-43");
+ t1Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES,
t1FavoredHostNames);
+ final List<String> t1UnfavoredHostIds = Arrays.asList("host-1",
"host-2", "host-3");
+ t1Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES,
t1UnfavoredHostIds);
+ //generate topologies
+ TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0,
CURRENT_TIME - 2, 10, "user");
+
+
+ Config t2Conf = new Config();
+ t2Conf.putAll(config);
+ t2Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES,
Arrays.asList("host-31", "host-32", "host-33"));
+ t2Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES,
Arrays.asList("host-11", "host-12", "host-13"));
+ TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0,
CURRENT_TIME - 2, 10, "user");
+
+ Topologies topologies = new Topologies(topo1, topo2);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+ List<String> supHostnames = new LinkedList<>();
+ for (SupervisorDetails sup : supMap.values()) {
+ supHostnames.add(sup.getHost());
+ }
+ Map<String, List<String>> rackToHosts =
testDNSToSwitchMapping.getRackToHosts();
+ cluster.setNetworkTopography(rackToHosts);
+
+ NodeSorterHostProximity nodeSorter = new
NodeSorterHostProximity(cluster, topo1,
BaseResourceAwareStrategy.NodeSortType.DEFAULT_RAS);
+ nodeSorter.prepare(null);
+ List<ObjectResourcesItem> sortedRacks =
StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+ .collect(Collectors.toList());
+ String rackSummaries = sortedRacks.stream()
+ .map(x -> String.format("Rack %s -> scheduled-cnt %d,
min-avail %f, avg-avail %f, cpu %f, mem %f",
+ x.id,
nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new
AtomicInteger(-1)).get(),
+ x.minResourcePercent, x.avgResourcePercent,
+ x.availableResources.getTotalCpu(),
+ x.availableResources.getTotalMemoryMb()))
+ .collect(Collectors.joining("\n\t"));
+
+ Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+ // Ranked first since rack-000 has the most balanced set of resources
+ Assert.assertEquals("rack-000 should be ordered first", "rack-000",
it.next().id);
+ // Ranked second since rack-1 has a balanced set of resources but less
than rack-0
+ Assert.assertEquals("rack-001 should be ordered second", "rack-001",
it.next().id);
+ // Ranked third since rack-4 has a lot of cpu but not a lot of memory
+ Assert.assertEquals("rack-004 should be ordered third", "rack-004",
it.next().id);
+ // Ranked fourth since rack-3 has alot of memory but not cpu
+ Assert.assertEquals("rack-003 should be ordered fourth", "rack-003",
it.next().id);
+ //Ranked last since rack-2 has not cpu resources
+ Assert.assertEquals("rack-00s2 should be ordered fifth", "rack-002",
it.next().id);
+ }
+
+ /**
+ * Test if hosts are presented together regardless of resource
availability.
+ * Supervisors are created with multiple Numa zones in such a manner that
resources on two numa zones on the same host
+ * differ widely in resource availability.
+ */
+ @Test
+ public void testMultipleRacksWithHostProximity() {
+ final Map<String, SupervisorDetails> supMap = new HashMap<>();
+ final int numRacks = 1;
+ final int numSupersPerRack = 12;
+ final int numPortsPerSuper = 4;
+ final int numZonesPerHost = 3;
+ final double numaResourceMultiplier = 0.4;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+
+ final Map<String, SupervisorDetails> supMapRack0 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 400, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate another rack of supervisors with less resources
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack1 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 200, 4000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate some supervisors that are depleted of one resource
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack2 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 0, 8000, Collections.emptyMap(), numaResourceMultiplier);
+
+ //generate some that has a lot of memory but little of cpu
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack3 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 10, 8000 * 2 + 4000,
Collections.emptyMap(),numaResourceMultiplier);
+
+ //generate some that has a lot of cpu but little of memory
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack4 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 400 + 200 + 10, 1000, Collections.emptyMap(),
numaResourceMultiplier);
+
+ supMap.putAll(supMapRack0);
+ supMap.putAll(supMapRack1);
+ supMap.putAll(supMapRack2);
+ supMap.putAll(supMapRack3);
+ supMap.putAll(supMapRack4);
+
+ Config config = createClusterConfig(100, 500, 500, null);
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ INimbus iNimbus = new INimbusTest();
+
+ //create test DNSToSwitchMapping plugin
+ TestDNSToSwitchMapping testDNSToSwitchMapping =
+ new TestDNSToSwitchMapping(supMapRack0, supMapRack1,
supMapRack2, supMapRack3, supMapRack4);
+
+ Config t1Conf = new Config();
+ t1Conf.putAll(config);
+ final List<String> t1FavoredHostNames = Arrays.asList("host-41",
"host-42", "host-43");
+ t1Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES,
t1FavoredHostNames);
+ final List<String> t1UnfavoredHostIds = Arrays.asList("host-1",
"host-2", "host-3");
+ t1Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES,
t1UnfavoredHostIds);
+ //generate topologies
+ TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0,
CURRENT_TIME - 2, 10, "user");
+
+
+ Config t2Conf = new Config();
+ t2Conf.putAll(config);
+ t2Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES,
Arrays.asList("host-31", "host-32", "host-33"));
+ t2Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES,
Arrays.asList("host-11", "host-12", "host-13"));
+ TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0,
CURRENT_TIME - 2, 10, "user");
+
+ Topologies topologies = new Topologies(topo1, topo2);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ INodeSorter nodeSorter = new NodeSorterHostProximity(cluster, topo1);
+ nodeSorter.prepare(null);
+
+ Set<String> seenHosts = new HashSet<>();
+ String prevHost = null;
+ List<String> errLines = new ArrayList();
+ Map<String, String> nodeToHost = new
RasNodes(cluster).getNodeIdToHostname();
+ for (String nodeId: nodeSorter.sortAllNodes()) {
+ String host = nodeToHost.getOrDefault(nodeId, "no-host-for-node-"
+ nodeId);
+ errLines.add(String.format("\tnodeId:%s, host:%s", nodeId, host));
+ if (!host.equals(prevHost) && seenHosts.contains(host)) {
+ String err = String.format("Host %s for node %s is out of
order:\n\t%s", host, nodeId, String.join("\n\t", errLines));
+ Assert.fail(err);
+ }
+ seenHosts.add(host);
+ prevHost = host;
+ }
+ }
+
+ /**
+ * Racks should be returned in order of decreasing capacity.
+ */
+ @Test
+ public void testMultipleRacksOrderedByCapacity() {
+ final Map<String, SupervisorDetails> supMap = new HashMap<>();
+ final int numRacks = 1;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 4;
+ final int numZonesPerHost = 1;
+ final double numaResourceMultiplier = 1.0;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+
+ final Map<String, SupervisorDetails> supMapRack0 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 600, 8000 - rackStartNum, Collections.emptyMap(),
numaResourceMultiplier);
+
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack1 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 500, 8000 - rackStartNum, Collections.emptyMap(),
numaResourceMultiplier);
+
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack2 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 400, 8000 - rackStartNum, Collections.emptyMap(),
numaResourceMultiplier);
+
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack3 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 300, 8000 - rackStartNum,
Collections.emptyMap(),numaResourceMultiplier);
+
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack4 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 200, 8000 - rackStartNum, Collections.emptyMap(),
numaResourceMultiplier);
+
+ // too small to hold topology
+ supStartNum += numSupersPerRack;
+ final Map<String, SupervisorDetails> supMapRack5 =
genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum++, supStartNum,
+ 100, 8000 - rackStartNum,
Collections.singletonMap("gpu.count", 0.0), numaResourceMultiplier);
+
+ supMap.putAll(supMapRack0);
+ supMap.putAll(supMapRack1);
+ supMap.putAll(supMapRack2);
+ supMap.putAll(supMapRack3);
+ supMap.putAll(supMapRack4);
+ supMap.putAll(supMapRack5);
+
+ Config config = createClusterConfig(100, 500, 500, null);
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ INimbus iNimbus = new INimbusTest();
+
+ //create test DNSToSwitchMapping plugin
+ TestDNSToSwitchMapping testDNSToSwitchMapping =
+ new TestDNSToSwitchMapping(supMapRack0, supMapRack1,
supMapRack2, supMapRack3, supMapRack4, supMapRack5);
+
+ //generate topologies
+ TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0,
CURRENT_TIME - 2, 10, "user");
+ TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0,
CURRENT_TIME - 2, 10, "user");
+
+ Topologies topologies = new Topologies(topo1, topo2);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ NodeSorterHostProximity nodeSorter = new
NodeSorterHostProximity(cluster, topo1);
+ nodeSorter.prepare(null);
+ List<ObjectResourcesItem> sortedRacks =
StreamSupport.stream(nodeSorter.getSortedRacks().spliterator(), false)
+ .collect(Collectors.toList());
+ String rackSummaries = sortedRacks
+ .stream()
+ .map(x -> String.format("Rack %s -> scheduled-cnt %d,
min-avail %f, avg-avail %f, cpu %f, mem %f",
+ x.id,
nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new
AtomicInteger(-1)).get(),
+ x.minResourcePercent, x.avgResourcePercent,
+ x.availableResources.getTotalCpu(),
+ x.availableResources.getTotalMemoryMb()))
+ .collect(Collectors.joining("\n\t"));
+ NormalizedResourceRequest topoResourceRequest =
topo1.getApproximateTotalResources();
+ String topoRequest = String.format("Topo %s,
approx-requested-resources %s", topo1.getId(), topoResourceRequest.toString());
+ Iterator<ObjectResourcesItem> it = sortedRacks.iterator();
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nRack-000
should be ordered first since it has the largest capacity", "rack-000",
it.next().id);
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-001
should be ordered second since it smaller than rack-000", "rack-001",
it.next().id);
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-002
should be ordered third since it is smaller than rack-001", "rack-002",
it.next().id);
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-003
should be ordered fourth since it since it is smaller than rack-002",
"rack-003", it.next().id);
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-004
should be ordered fifth since it since it is smaller than rack-003",
"rack-004", it.next().id);
+ Assert.assertEquals(topoRequest + "\n\t" + rackSummaries + "\nrack-005
should be ordered last since it since it is has smallest capacity", "rack-005",
it.next().id);
+ }
+
+ /**
+ * Schedule two topologies, once with special resources and another
without.
+ * There are enough special resources to hold one topology with special
resource ("my.gpu").
+ * If the sort order is incorrect, scheduling will not succeed.
+ */
+ @Test
+ public void testAntiAffinityWithMultipleTopologies() {
+ INimbus iNimbus = new INimbusTest();
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(1, 40,
66, 0, 0, 4700, 226200, new HashMap<>());
+ HashMap<String, Double> extraResources = new HashMap<>();
+ extraResources.put("my.gpu", 1.0);
+ supMap.putAll(genSupervisorsWithRacks(1, 40, 66, 1, 0, 4700, 226200,
extraResources));
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(88, 775, 25, null, null));
+
+ IScheduler scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ TopologyDetails tdSimple = genTopology("topology-simple", config, 1,
+ 5, 100, 300, 0, 0, "user", 8192);
+
+ //Schedule the simple topology first
+ Topologies topologies = new Topologies(tdSimple);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+
+ {
+ NodeSorterHostProximity nodeSorter = new
NodeSorterHostProximity(cluster, tdSimple);
+ for (ExecutorDetails exec : tdSimple.getExecutors()) {
+ nodeSorter.prepare(exec);
+ List<ObjectResourcesItem> sortedRacks = StreamSupport
+ .stream(nodeSorter.getSortedRacks().spliterator(),
false)
+ .collect(Collectors.toList());
+ String rackSummaries = StreamSupport
+ .stream(sortedRacks.spliterator(), false)
+ .map(x -> String.format("Rack %s -> scheduled-cnt %d,
min-avail %f, avg-avail %f, cpu %f, mem %f",
+ x.id,
nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new
AtomicInteger(-1)).get(),
+ x.minResourcePercent, x.avgResourcePercent,
+ x.availableResources.getTotalCpu(),
+ x.availableResources.getTotalMemoryMb()))
+ .collect(Collectors.joining("\n\t"));
+ NormalizedResourceRequest topoResourceRequest =
tdSimple.getApproximateTotalResources();
+ String topoRequest = String.format("Topo %s,
approx-requested-resources %s", tdSimple.getId(),
topoResourceRequest.toString());
+ Assert.assertEquals(rackSummaries + "\n# of racks sorted", 2,
sortedRacks.size());
+ Assert.assertEquals(rackSummaries + "\nFirst rack sorted",
"rack-000", sortedRacks.get(0).id);
+ Assert.assertEquals(rackSummaries + "\nSecond rack sorted",
"rack-001", sortedRacks.get(1).id);
+ }
+ }
+
+ scheduler.schedule(topologies, cluster);
+
+ TopologyBuilder builder = topologyBuilder(1, 5, 100, 300);
+ builder.setBolt("gpu-bolt", new TestBolt(), 40)
+ .addResource("my.gpu", 1.0)
+ .shuffleGrouping("spout-0");
+ TopologyDetails tdGpu = topoToTopologyDetails("topology-gpu", config,
builder.createTopology(), 0, 0,"user", 8192);
+
+ //Now schedule GPU but with the simple topology in place.
+ topologies = new Topologies(tdSimple, tdGpu);
+ cluster = new Cluster(cluster, topologies);
+ {
+ NodeSorterHostProximity nodeSorter = new
NodeSorterHostProximity(cluster, tdGpu);
+ for (ExecutorDetails exec : tdGpu.getExecutors()) {
+ String comp = tdGpu.getComponentFromExecutor(exec);
+ nodeSorter.prepare(exec);
+ List<ObjectResourcesItem> sortedRacks = StreamSupport
+ .stream(nodeSorter.getSortedRacks().spliterator(),
false).collect(Collectors.toList());
+ String rackSummaries = sortedRacks.stream()
+ .map(x -> String.format("Rack %s -> scheduled-cnt %d,
min-avail %f, avg-avail %f, cpu %f, mem %f",
+ x.id,
nodeSorter.getScheduledExecCntByRackId().getOrDefault(x.id, new
AtomicInteger(-1)).get(),
+ x.minResourcePercent, x.avgResourcePercent,
+ x.availableResources.getTotalCpu(),
+ x.availableResources.getTotalMemoryMb()))
+ .collect(Collectors.joining("\n\t"));
+ NormalizedResourceRequest topoResourceRequest =
tdSimple.getApproximateTotalResources();
+ String topoRequest = String.format("Topo %s,
approx-requested-resources %s", tdSimple.getId(),
topoResourceRequest.toString());
+ Assert.assertEquals(rackSummaries + "\n# of racks sorted", 2,
sortedRacks.size());
+ if (comp.equals("gpu-bolt")) {
+ Assert.assertEquals(rackSummaries + "\nFirst rack sorted
for " + comp, "rack-001", sortedRacks.get(0).id);
+ Assert.assertEquals(rackSummaries + "\nSecond rack sorted
for " + comp, "rack-000", sortedRacks.get(1).id);
+ } else {
+ Assert.assertEquals(rackSummaries + "\nFirst rack sorted
for " + comp, "rack-000", sortedRacks.get(0).id);
+ Assert.assertEquals(rackSummaries + "\nSecond rack sorted
for " + comp, "rack-001", sortedRacks.get(1).id);
+ }
+ }
+ }
+
+ scheduler.schedule(topologies, cluster);
+
+ Map<String, SchedulerAssignment> assignments = new
TreeMap<>(cluster.getAssignments());
+ assertEquals(2, assignments.size());
+
+ Map<String, Map<String, AtomicLong>> topoPerRackCount = new
HashMap<>();
+ for (Map.Entry<String, SchedulerAssignment> entry:
assignments.entrySet()) {
+ SchedulerAssignment sa = entry.getValue();
+ Map<String, AtomicLong> slotsPerRack = new TreeMap<>();
+ for (WorkerSlot slot : sa.getSlots()) {
+ String nodeId = slot.getNodeId();
+ String rack = supervisorIdToRackName(nodeId);
+ slotsPerRack.computeIfAbsent(rack, (r) -> new
AtomicLong(0)).incrementAndGet();
+ }
+ LOG.info("{} => {}", entry.getKey(), slotsPerRack);
+ topoPerRackCount.put(entry.getKey(), slotsPerRack);
+ }
+
+ Map<String, AtomicLong> simpleCount =
topoPerRackCount.get("topology-simple-0");
+ assertNotNull(simpleCount);
+ //Because the simple topology was scheduled first we want to be sure
that it didn't put anything on
+ // the GPU nodes.
+ assertEquals(1, simpleCount.size()); //Only 1 rack is in use
+ assertFalse(simpleCount.containsKey("r001")); //r001 is the second
rack with GPUs
+ assertTrue(simpleCount.containsKey("r000")); //r000 is the first rack
with no GPUs
+
+ //We don't really care too much about the scheduling of
topology-gpu-0, because it was scheduled.
+ }
+
+ /**
+ * Free one-fifth of WorkerSlots.
+ */
+ private void freeSomeWorkerSlots(Cluster cluster) {
+ Map<String, SchedulerAssignment> assignmentMap =
cluster.getAssignments();
+ for (SchedulerAssignment schedulerAssignment: assignmentMap.values()) {
+ int i = 0;
+ List<WorkerSlot> slotsToKill = new ArrayList<>();
+ for (WorkerSlot workerSlot: schedulerAssignment.getSlots()) {
+ i++;
+ if (i % 5 == 0) {
+ slotsToKill.add(workerSlot);
+ }
+ }
+ cluster.freeSlots(slotsToKill);
+ }
+ }
+
+ /**
+ * If the topology is too large for one rack, it should be partially
scheduled onto the next rack (and next rack only).
+ */
+ @Test
+ public void testFillUpRackAndSpilloverToNextRack() {
+ INimbus iNimbus = new INimbusTest();
+ double compPcore = 100;
+ double compOnHeap = 775;
+ double compOffHeap = 25;
+ int topo1NumSpouts = 1;
+ int topo1NumBolts = 5;
+ int topo1SpoutParallelism = 100;
+ int topo1BoltParallelism = 200;
+ final int numRacks = 3;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 6;
+ final int numZonesPerHost = 1;
+ final double numaResourceMultiplier = 1.0;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+ long compPerRack = (topo1NumSpouts * topo1SpoutParallelism +
topo1NumBolts * topo1BoltParallelism) * 4/5; // not enough for topo1
+ long compPerSuper = compPerRack / numSupersPerRack;
+ double cpuPerSuper = compPcore * compPerSuper;
+ double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+ double topo1MaxHeapSize = memPerSuper;
+ final String topoName1 = "topology1";
+
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum, supStartNum,
+ cpuPerSuper, memPerSuper, Collections.emptyMap(),
numaResourceMultiplier);
+ TestDNSToSwitchMapping testDNSToSwitchMapping = new
TestDNSToSwitchMapping(supMap.values());
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(compPcore, compOnHeap,
compOffHeap, null, null));
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY,
GenericResourceAwareStrategy.class.getName());
+
+ IScheduler scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+ topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0,
0, "user", topo1MaxHeapSize);
+
+ //Schedule the topo1 topology and ensure it fits on 2 racks
+ Topologies topologies = new Topologies(td1);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ scheduler.schedule(topologies, cluster);
+ Set<String> assignedRacks = cluster.getAssignedRacks(td1.getId());
+ assertEquals("Racks for topology=" + td1.getId() + " is " +
assignedRacks, 2, assignedRacks.size());
+ }
+
+ /**
+ * Rack with low resources should be used to schedule an executor if it
has other executors for the same topology.
+ * <li>Schedule topo1 on one rack</li>
+ * <li>unassign some executors</li>
+ * <li>schedule another topology to partially fill up rack1</li>
+ * <li>Add another rack and schedule topology 1 remaining executors
again</li>
+ * <li>scheduling should utilize all resources on rack1 before before
trying next rack</li>
+ */
+ @Test
+ public void testPreferRackWithTopoExecutors() {
+ INimbus iNimbus = new INimbusTest();
+ double compPcore = 100;
+ double compOnHeap = 775;
+ double compOffHeap = 25;
+ int topo1NumSpouts = 1;
+ int topo1NumBolts = 5;
+ int topo1SpoutParallelism = 100;
+ int topo1BoltParallelism = 200;
+ int topo2NumSpouts = 1;
+ int topo2NumBolts = 5;
+ int topo2SpoutParallelism = 10;
+ int topo2BoltParallelism = 20;
+ final int numRacks = 3;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 6;
+ final int numZonesPerHost = 1;
+ final double numaResourceMultiplier = 1.0;
+ int rackStartNum = 0;
+ int supStartNum = 0;
+ long compPerRack = (topo1NumSpouts * topo1SpoutParallelism +
topo1NumBolts * topo1BoltParallelism
+ + topo2NumSpouts * topo2SpoutParallelism); // enough for topo1
but not topo1+topo2
+ long compPerSuper = compPerRack / numSupersPerRack;
+ double cpuPerSuper = compPcore * compPerSuper;
+ double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+ double topo1MaxHeapSize = memPerSuper;
+ double topo2MaxHeapSize = memPerSuper;
+ final String topoName1 = "topology1";
+ final String topoName2 = "topology2";
+
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithRacksAndNuma(
+ numRacks, numSupersPerRack, numZonesPerHost, numPortsPerSuper,
rackStartNum, supStartNum,
+ cpuPerSuper, memPerSuper, Collections.emptyMap(),
numaResourceMultiplier);
+ TestDNSToSwitchMapping testDNSToSwitchMapping = new
TestDNSToSwitchMapping(supMap.values());
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(compPcore, compOnHeap,
compOffHeap, null, null));
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY,
GenericResourceAwareStrategy.class.getName());
+
+ IScheduler scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+ topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0,
0, "user", topo1MaxHeapSize);
+
+ //Schedule the topo1 topology and ensure it fits on 1 rack
+ Topologies topologies = new Topologies(td1);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ scheduler.schedule(topologies, cluster);
+ Set<String> assignedRacks = cluster.getAssignedRacks(td1.getId());
+ assertEquals("Racks for topology=" + td1.getId() + " is " +
assignedRacks, 1, assignedRacks.size());
+
+ TopologyBuilder builder = topologyBuilder(topo2NumSpouts,
topo2NumBolts, topo2SpoutParallelism, topo2BoltParallelism);
+ TopologyDetails td2 = topoToTopologyDetails(topoName2, config,
builder.createTopology(), 0, 0,"user", topo2MaxHeapSize);
+
+ //Now schedule GPU but with the simple topology in place.
+ topologies = new Topologies(td1, td2);
+ cluster = new Cluster(cluster, topologies);
+ scheduler.schedule(topologies, cluster);
+
+ assignedRacks = cluster.getAssignedRacks(td1.getId(), td2.getId());
+ assertEquals("Racks for topologies=" + td1.getId() + "/" + td2.getId()
+ " is " + assignedRacks, 2, assignedRacks.size());
+
+ // topo2 gets scheduled on its own rack because it is empty and
available
+ assignedRacks = cluster.getAssignedRacks(td2.getId());
+ assertEquals("Racks for topologies=" + td2.getId() + " is " +
assignedRacks, 1, assignedRacks.size());
+
+ // now unassign topo2, expect only one rack to be in use; free some
slots and reschedule topo1 some topo1 executors
+ cluster.unassign(td2.getId());
+ assignedRacks = cluster.getAssignedRacks(td2.getId());
+ assertEquals("After unassigning topology " + td2.getId() + ", racks
for topology=" + td2.getId() + " is " + assignedRacks,
+ 0, assignedRacks.size());
+ assignedRacks = cluster.getAssignedRacks(td1.getId());
+ assertEquals("After unassigning topology " + td2.getId() + ", racks
for topology=" + td1.getId() + " is " + assignedRacks,
+ 1, assignedRacks.size());
+ assertFalse("Topology " + td1.getId() + " should be fully assigned
before freeing slots", cluster.needsSchedulingRas(td1));
+ freeSomeWorkerSlots(cluster);
+ assertTrue("Topology " + td1.getId() + " should need scheduling after
freeing slots", cluster.needsSchedulingRas(td1));
+
+ // then reschedule executors
+ scheduler.schedule(topologies, cluster);
+
+ // only one rack should be in use by topology1
+ assignedRacks = cluster.getAssignedRacks(td1.getId());
+ assertEquals("After reassigning topology " + td2.getId() + ", racks
for topology=" + td1.getId() + " is " + assignedRacks,
+ 1, assignedRacks.size());
+ }
+
+ /**
+ * Assign and then clear out a rack to host list mapping in
cluster.networkTopography.
+ * Expected behavior is that:
+ * <li>the rack without hosts does not show up in {@link
NodeSorterHostProximity#getSortedRacks()}</li>
+ * <li>all the supervisor nodes still get returned in {@link
NodeSorterHostProximity#sortAllNodes()} ()}</li>
+ * <li>supervisors on cleared rack show up under {@link
DNSToSwitchMapping#DEFAULT_RACK}</li>
+ *
+ * <p>
+ * Force an usual condition, where one of the racks is still passed
to LazyNodeSortingIterator with
+ * an empty list and then ensure that code is resilient.
+ * </p>
+ */
+ @Test
+ void testWithImpairedClusterNetworkTopography() {
+ INimbus iNimbus = new INimbusTest();
+ double compPcore = 100;
+ double compOnHeap = 775;
+ double compOffHeap = 25;
+ int topo1NumSpouts = 1;
+ int topo1NumBolts = 5;
+ int topo1SpoutParallelism = 100;
+ int topo1BoltParallelism = 200;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 66;
+ long compPerRack = (topo1NumSpouts * topo1SpoutParallelism +
topo1NumBolts * topo1BoltParallelism + 10);
+ long compPerSuper = compPerRack / numSupersPerRack;
+ double cpuPerSuper = compPcore * compPerSuper;
+ double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+ double topo1MaxHeapSize = memPerSuper;
+ final String topoName1 = "topology1";
+ int numRacks = 3;
+
+ Map<String, SupervisorDetails> supMap =
genSupervisorsWithRacks(numRacks, numSupersPerRack, numPortsPerSuper,
+ 0, 0, cpuPerSuper, memPerSuper, new HashMap<>());
+ TestDNSToSwitchMapping testDNSToSwitchMapping = new
TestDNSToSwitchMapping(supMap.values());
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(compPcore, compOnHeap,
compOffHeap, null, null));
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY,
GenericResourceAwareStrategy.class.getName());
+
+ IScheduler scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+ topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0,
"user", topo1MaxHeapSize);
+
+ Topologies topologies = new Topologies(td1);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ Map<String, List<String>> networkTopography =
cluster.getNetworkTopography();
+ assertEquals("Expecting " + numRacks + " racks found " +
networkTopography.size(), numRacks, networkTopography.size());
+ assertTrue("Expecting racks count to be >= 3, found " +
networkTopography.size(), networkTopography.size() >= 3);
+
+ // Impair cluster.networkTopography and set one rack to have zero
hosts, getSortedRacks should exclude this rack.
+ // Keep, the supervisorDetails unchanged - confirm that these nodes
are not lost even with incomplete networkTopography
+ String rackIdToZero =
networkTopography.keySet().stream().findFirst().get();
+ impairClusterRack(cluster, rackIdToZero, true, false);
+
+ NodeSorterHostProximity nodeSorterHostProximity = new
NodeSorterHostProximity(cluster, td1);
+ nodeSorterHostProximity.getSortedRacks().forEach(x ->
assertNotEquals(x.id, rackIdToZero));
+
+ // confirm that the above action has not lost the hosts and that they
appear under the DEFAULT rack
+ {
+ Set<String> seenRacks = new HashSet<>();
+ nodeSorterHostProximity.getSortedRacks().forEach(x ->
seenRacks.add(x.id));
+ assertEquals("Expecting rack cnt to be still " + numRacks,
numRacks, seenRacks.size());
+ assertTrue("Expecting to see default-rack=" +
DNSToSwitchMapping.DEFAULT_RACK + " in sortedRacks",
+ seenRacks.contains(DNSToSwitchMapping.DEFAULT_RACK));
+ }
+
+ // now check if node/supervisor is missing when sorting all nodes
+ Set<String> expectedNodes = supMap.keySet();
+ Set<String> seenNodes = new HashSet<>();
+ nodeSorterHostProximity.prepare(null);
+ nodeSorterHostProximity.sortAllNodes().forEach( n -> seenNodes.add(n));
+ assertEquals("Expecting see all supervisors ", expectedNodes,
seenNodes);
+
+ // Now fully impair the cluster - confirm no default rack
+ {
+ cluster = new Cluster(iNimbus, new ResourceMetrics(new
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ cluster.setNetworkTopography(new
TestDNSToSwitchMapping(supMap.values()).getRackToHosts());
+ impairClusterRack(cluster, rackIdToZero, true, true);
+ Set<String> seenRacks = new HashSet<>();
+ NodeSorterHostProximity nodeSorterHostProximity2 = new
NodeSorterHostProximity(cluster, td1);
+ nodeSorterHostProximity2.getSortedRacks().forEach(x ->
seenRacks.add(x.id));
+ Map<String, Set<String>> rackIdToHosts =
nodeSorterHostProximity2.getRackIdToHosts();
+ String dumpOfRacks = rackIdToHosts.entrySet().stream()
+ .map(x -> String.format("rack %s -> hosts [%s]", x.getKey(),
String.join(",", x.getValue())))
+ .collect(Collectors.joining("\n\t"));
+ assertEquals("Expecting rack cnt to be " + (numRacks - 1) + " but
found " + seenRacks.size() + "\n\t" + dumpOfRacks,
+ numRacks - 1, seenRacks.size());
+ assertFalse("Found default-rack=" +
DNSToSwitchMapping.DEFAULT_RACK + " in \n\t" + dumpOfRacks,
+ seenRacks.contains(DNSToSwitchMapping.DEFAULT_RACK));
+ }
+ }
+
+ /**
+ * Black list all nodes for a rack before sorting nodes.
+ * Confirm that {@link NodeSorterHostProximity#sortAllNodes()} still works.
+ *
+ */
+ @Test
+ void testWithBlackListedHosts() {
+ INimbus iNimbus = new INimbusTest();
+ double compPcore = 100;
+ double compOnHeap = 775;
+ double compOffHeap = 25;
+ int topo1NumSpouts = 1;
+ int topo1NumBolts = 5;
+ int topo1SpoutParallelism = 100;
+ int topo1BoltParallelism = 200;
+ final int numSupersPerRack = 10;
+ final int numPortsPerSuper = 66;
+ long compPerRack = (topo1NumSpouts * topo1SpoutParallelism +
topo1NumBolts * topo1BoltParallelism + 10);
+ long compPerSuper = compPerRack / numSupersPerRack;
+ double cpuPerSuper = compPcore * compPerSuper;
+ double memPerSuper = (compOnHeap + compOffHeap) * compPerSuper;
+ double topo1MaxHeapSize = memPerSuper;
+ final String topoName1 = "topology1";
+ int numRacks = 3;
+
+ Map<String, SupervisorDetails> supMap =
genSupervisorsWithRacks(numRacks, numSupersPerRack, numPortsPerSuper,
+ 0, 0, cpuPerSuper, memPerSuper, new HashMap<>());
+ TestDNSToSwitchMapping testDNSToSwitchMapping = new
TestDNSToSwitchMapping(supMap.values());
+
+ Config config = new Config();
+ config.putAll(createGrasClusterConfig(compPcore, compOnHeap,
compOffHeap, null, null));
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY,
GenericResourceAwareStrategy.class.getName());
+
+ IScheduler scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config, new StormMetricsRegistry());
+
+ TopologyDetails td1 = genTopology(topoName1, config, topo1NumSpouts,
+ topo1NumBolts, topo1SpoutParallelism, topo1BoltParallelism, 0, 0,
"user", topo1MaxHeapSize);
+
+ Topologies topologies = new Topologies(td1);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new
StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ cluster.setNetworkTopography(testDNSToSwitchMapping.getRackToHosts());
+
+ Map<String, List<String>> networkTopography =
cluster.getNetworkTopography();
+ assertEquals("Expecting " + numRacks + " racks found " +
networkTopography.size(), numRacks, networkTopography.size());
+ assertTrue("Expecting racks count to be >= 3, found " +
networkTopography.size(), networkTopography.size() >= 3);
+
+ Set<String> blackListedHosts = new HashSet<>();
+ List<SupervisorDetails> supArray = new ArrayList<>(supMap.values());
+ for (int i = 0 ; i < numSupersPerRack ; i++) {
+ blackListedHosts.add(supArray.get(i).getHost());
+ }
+ blacklistHostsAndSortNodes(blackListedHosts, supMap.values(), cluster,
td1);
+
+ String rackToClear =
cluster.getNetworkTopography().keySet().stream().findFirst().get();
+ blackListedHosts = new
HashSet<>(cluster.getNetworkTopography().get(rackToClear));
+ blacklistHostsAndSortNodes(blackListedHosts, supMap.values(), cluster,
td1);
+ }
+
+ // Impair cluster by blacklisting some hosts
+ private void blacklistHostsAndSortNodes(
+ Set<String> blackListedHosts, Collection<SupervisorDetails> sups,
Cluster cluster, TopologyDetails td1) {
+ LOG.info("blackListedHosts={}", blackListedHosts);
+ cluster.setBlacklistedHosts(blackListedHosts);
+
+ NodeSorterHostProximity nodeSorterHostProximity = new
NodeSorterHostProximity(cluster, td1);
+ // confirm that the above action loses hosts
+ {
+ Set<String> allHosts = sups.stream().map(x ->
x.getHost()).collect(Collectors.toSet());
+ Set<String> seenRacks = new HashSet<>();
+ nodeSorterHostProximity.getSortedRacks().forEach(x ->
seenRacks.add(x.id));
+ Set<String> seenHosts = new HashSet<>();
+ nodeSorterHostProximity.getRackIdToHosts().forEach((k,v) ->
seenHosts.addAll(v));
+ allHosts.removeAll(seenHosts);
+ assertEquals("Expecting only blacklisted hosts removed", allHosts,
blackListedHosts);
+ }
+
+ // now check if sortAllNodes still works
+ Set<String> expectedNodes = sups.stream()
+ .filter(x -> !blackListedHosts.contains(x.getHost()))
+ .map(x ->x.getId())
+ .collect(Collectors.toSet());
+ Set<String> seenNodes = new HashSet<>();
+ nodeSorterHostProximity.prepare(null);
+ nodeSorterHostProximity.sortAllNodes().forEach( n ->
seenNodes.add(n));
+ assertEquals("Expecting see all supervisors ", expectedNodes,
seenNodes);
+ }
+
+ /**
+ * Impair the cluster for a specified rackId.
+ * <li>making the host list a zero length</li>
+ * <li>removing supervisors for the hosts on the rack</li>
+ *
+ * @param cluster cluster to impair
+ * @param rackId rackId to clear
+ * @param clearNetworkTopography if true, then clear (but not remove) the
hosts in list for the rack.
+ * @param clearSupervisorMap if true, then remove supervisors for the rack.
+ */
+ private void impairClusterRack(Cluster cluster, String rackId, boolean
clearNetworkTopography, boolean clearSupervisorMap) {
+ Set<String> hostIds = new
HashSet<>(cluster.getNetworkTopography().computeIfAbsent(rackId, k -> new
ArrayList<>()));
+ if (clearNetworkTopography) {
+ cluster.getNetworkTopography().computeIfAbsent(rackId, k -> new
ArrayList<>()).clear();
+ }
+ if (clearSupervisorMap) {
+ Set<String> supToRemove = new HashSet<>();
+ for (String hostId: hostIds) {
+ cluster.getSupervisorsByHost(hostId).forEach(s ->
supToRemove.add(s.getId()));
+ }
+ Map<String, SupervisorDetails> supervisorDetailsMap =
cluster.getSupervisors();
+ for (String supId: supToRemove) {
+ supervisorDetailsMap.remove(supId);
+ }
+ }
+ }
+}
\ No newline at end of file