Ethanlm commented on a change in pull request #3379:
URL: https://github.com/apache/storm/pull/3379#discussion_r580371328
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
##########
@@ -384,14 +391,29 @@ public double
calculateMinPercentageUsedBy(NormalizedResources used, double tota
/**
* If a node or rack has a kind of resource not in a request, make that
resource negative so when sorting that node or rack will
- * be less likely to be selected.
+ * be less likely to be selected. If the resource is in the request, make
that resource positive.
* @param request the requested resources.
*/
public void updateForRareResourceAffinity(NormalizedResources request) {
int length = Math.min(this.otherResources.length,
request.otherResources.length);
for (int i = 0; i < length; i++) {
- if (request.getResourceAt(i) == 0.0) {
- this.otherResources[i] = -1 * this.otherResources[i];
+ if (request.getResourceAt(i) == 0.0 && this.otherResources[i] >
0.0) {
+ this.otherResources[i] = -this.otherResources[i]; // make
negative
+ } else if (request.getResourceAt(i) > 0.0 &&
this.otherResources[i] < 0.0) {
+ this.otherResources[i] = -this.otherResources[i]; // make
positive
+ }
+ }
+ }
+
+ /**
+ * If the resource is negative, then make that resource positive.
+ * This will undo effects of {@link
#updateForRareResourceAffinity(NormalizedResources)}.
+ */
+ public void revertUpdatesForRareResourceAffinity() {
Review comment:
Can we remove this? It doesn't seem to be used anywhere
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
##########
@@ -384,14 +391,29 @@ public double
calculateMinPercentageUsedBy(NormalizedResources used, double tota
/**
* If a node or rack has a kind of resource not in a request, make that
resource negative so when sorting that node or rack will
- * be less likely to be selected.
+ * be less likely to be selected. If the resource is in the request, make
that resource positive.
* @param request the requested resources.
*/
public void updateForRareResourceAffinity(NormalizedResources request) {
int length = Math.min(this.otherResources.length,
request.otherResources.length);
for (int i = 0; i < length; i++) {
- if (request.getResourceAt(i) == 0.0) {
- this.otherResources[i] = -1 * this.otherResources[i];
+ if (request.getResourceAt(i) == 0.0 && this.otherResources[i] >
0.0) {
+ this.otherResources[i] = -this.otherResources[i]; // make
negative
+ } else if (request.getResourceAt(i) > 0.0 &&
this.otherResources[i] < 0.0) {
Review comment:
Can you please help me understand why is this change necessary? Did you
find some bugs here? Thanks
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
##########
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
+import org.apache.storm.Config;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNode;
+import org.apache.storm.scheduler.resource.RasNodes;
+import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeSorterHostProximity implements INodeSorter {
+ private static final Logger LOG =
LoggerFactory.getLogger(NodeSorterHostProximity.class);
+
+ // instance variables from class instantiation
+ protected final BaseResourceAwareStrategy.NodeSortType nodeSortType;
+
+ protected Cluster cluster;
+ protected TopologyDetails topologyDetails;
+
+ // Instance variables derived from Cluster.
+ private final Map<String, List<String>> networkTopography;
+ private final Map<String, String> superIdToRack = new HashMap<>();
+ private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
+ private final Map<String, String> nodeIdToHostname = new HashMap<>();
+ private final Map<String, Set<String>> rackIdToHosts;
+ protected List<String> greyListedSupervisorIds;
+
+ // Instance variables from Cluster and TopologyDetails.
+ protected List<String> favoredNodeIds;
+ protected List<String> unFavoredNodeIds;
+
+ // Updated in prepare method
+ ExecutorDetails exec;
+
+ public NodeSorterHostProximity(Cluster cluster, TopologyDetails
topologyDetails) {
+ this(cluster, topologyDetails,
BaseResourceAwareStrategy.NodeSortType.COMMON);
+ }
+
+ /**
+ * Initialize for the default implementation node sorting.
+ *
+ * <p>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#GENERIC_RAS} sorting
implemented in
+ * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary,
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#DEFAULT_RAS} sorting
implemented in
+ * {@link #sortObjectResourcesDefault(ObjectResourcesSummary,
NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#COMMON} sorting
implemented in
+ * {@link #sortObjectResourcesCommon(ObjectResourcesSummary,
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * </p>
+ *
+ * @param cluster for which nodes will be sorted.
+ * @param topologyDetails the topology to sort for.
+ * @param nodeSortType type of sorting to be applied to object resource
collection {@link BaseResourceAwareStrategy.NodeSortType}.
+ */
+ public NodeSorterHostProximity(Cluster cluster, TopologyDetails
topologyDetails, BaseResourceAwareStrategy.NodeSortType nodeSortType) {
+ this.cluster = cluster;
+ this.topologyDetails = topologyDetails;
+ this.nodeSortType = nodeSortType;
+
+ // from Cluster
+ networkTopography = cluster.getNetworkTopography();
+ greyListedSupervisorIds = cluster.getGreyListedSupervisors();
+ Map<String, String> hostToRack = cluster.getHostToRack();
+ RasNodes nodes = new RasNodes(cluster);
+ for (RasNode node: nodes.getNodes()) {
+ String superId = node.getId();
+ String hostName = node.getHostname();
+ if (hostName == null) {
+ // ignore supervisors on blacklistedHosts
+ continue;
+ }
+ String rackId = hostToRack.getOrDefault(hostName,
DNSToSwitchMapping.DEFAULT_RACK);
+ superIdToRack.put(superId, rackId);
+ hostnameToNodes.computeIfAbsent(hostName, (hn) -> new
ArrayList<>()).add(node);
+ nodeIdToHostname.put(superId, hostName);
+ }
+ rackIdToHosts = computeRackToHosts(cluster, superIdToRack);
+
+ // from TopologyDetails
+ Map<String, Object> topoConf = topologyDetails.getConf();
+
+ // From Cluster and TopologyDetails - and cleaned-up
+ favoredNodeIds = makeHostToNodeIds((List<String>)
topoConf.get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+ unFavoredNodeIds = makeHostToNodeIds((List<String>)
topoConf.get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
+ favoredNodeIds.removeAll(greyListedSupervisorIds);
+ unFavoredNodeIds.removeAll(greyListedSupervisorIds);
+ unFavoredNodeIds.removeAll(favoredNodeIds);
+ }
+
+ @VisibleForTesting
+ public Map<String, Set<String>> getRackIdToHosts() {
+ return rackIdToHosts;
+ }
+
+ @Override
+ public void prepare(ExecutorDetails exec) {
+ this.exec = exec;
+ }
+
+ /**
+ * Get a key corresponding to this executor resource request. The key
contains all non-zero resources requested by
+ * the executor.
+ *
+ * <p>This key can be used to retrieve pre-sorted list of
racks/hosts/nodes. So executors with similar set of
+ * rare resource request will be reuse the same cached list instead of
creating a new sorted list of nodes.</p>
+ *
+ * @param exec executor whose requested resources are being examined.
+ * @return a set of resource names that have values greater that zero.
+ */
+ public Set<String> getExecRequestKey(@Nonnull ExecutorDetails exec) {
Review comment:
Seems not used anywhere. Can we delete it?
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
##########
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
+import org.apache.storm.Config;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNode;
+import org.apache.storm.scheduler.resource.RasNodes;
+import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeSorterHostProximity implements INodeSorter {
+ private static final Logger LOG =
LoggerFactory.getLogger(NodeSorterHostProximity.class);
+
+ // instance variables from class instantiation
+ protected final BaseResourceAwareStrategy.NodeSortType nodeSortType;
+
+ protected Cluster cluster;
+ protected TopologyDetails topologyDetails;
+
+ // Instance variables derived from Cluster.
+ private final Map<String, List<String>> networkTopography;
+ private final Map<String, String> superIdToRack = new HashMap<>();
+ private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
+ private final Map<String, String> nodeIdToHostname = new HashMap<>();
+ //private final Map<String, List<RasNode>> rackIdToNodes = new HashMap<>();
+ private final Map<String, List<String>> rackIdToHosts = new HashMap<>();
+ protected List<String> greyListedSupervisorIds;
+
+ // Instance variables from Cluster and TopologyDetails.
+ protected List<String> favoredNodeIds;
+ protected List<String> unFavoredNodeIds;
+
+ // Updated in prepare method
+ ExecutorDetails exec;
+
+ public NodeSorterHostProximity(Cluster cluster, TopologyDetails
topologyDetails) {
+ this(cluster, topologyDetails,
BaseResourceAwareStrategy.NodeSortType.COMMON);
+ }
+
+ /**
+ * Initialize for the default implementation node sorting.
+ *
+ * <p>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#GENERIC_RAS} sorting
implemented in
+ * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary,
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#DEFAULT_RAS} sorting
implemented in
+ * {@link #sortObjectResourcesDefault(ObjectResourcesSummary,
NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#COMMON} sorting
implemented in
+ * {@link #sortObjectResourcesCommon(ObjectResourcesSummary,
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * </p>
+ *
+ * @param cluster for which nodes will be sorted.
+ * @param topologyDetails the topology to sort for.
+ * @param nodeSortType type of sorting to be applied to object resource
collection {@link BaseResourceAwareStrategy.NodeSortType}.
+ */
+ public NodeSorterHostProximity(Cluster cluster, TopologyDetails
topologyDetails, BaseResourceAwareStrategy.NodeSortType nodeSortType) {
+ this.cluster = cluster;
+ this.topologyDetails = topologyDetails;
+ this.nodeSortType = nodeSortType;
+
+ // from Cluster
+ networkTopography = cluster.getNetworkTopography();
+ Map<String, String> hostToRack = cluster.getHostToRack();
+ RasNodes nodes = new RasNodes(cluster);
+ for (RasNode node: nodes.getNodes()) {
+ String superId = node.getId();
+ String hostName = node.getHostname();
+ String rackId = hostToRack.getOrDefault(hostName,
DNSToSwitchMapping.DEFAULT_RACK);
+ superIdToRack.put(superId, rackId);
+ hostnameToNodes.computeIfAbsent(hostName, (hn) -> new
ArrayList<>()).add(node);
+ nodeIdToHostname.put(superId, hostName);
+ rackIdToHosts.computeIfAbsent(rackId, id -> new
ArrayList<>()).add(hostName);
+ }
+ this.greyListedSupervisorIds = cluster.getGreyListedSupervisors();
+
+ // from TopologyDetails
+ Map<String, Object> topoConf = topologyDetails.getConf();
+
+ // From Cluster and TopologyDetails - and cleaned-up
+ favoredNodeIds = makeHostToNodeIds((List<String>)
topoConf.get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+ unFavoredNodeIds = makeHostToNodeIds((List<String>)
topoConf.get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
+ favoredNodeIds.removeAll(greyListedSupervisorIds);
+ unFavoredNodeIds.removeAll(greyListedSupervisorIds);
+ unFavoredNodeIds.removeAll(favoredNodeIds);
+ }
+
+ @Override
+ public void prepare(ExecutorDetails exec) {
+ this.exec = exec;
+ }
+
+ /**
+ * Get a key corresponding to this executor resource request. The key
contains all non-zero resources requested by
+ * the executor.
+ *
+ * <p>This key can be used to retrieve pre-sorted list of
racks/hosts/nodes. So executors with similar set of
+ * rare resource request will be reuse the same cached list instead of
creating a new sorted list of nodes.</p>
+ *
+ * @param exec executor whose requested resources are being examined.
+ * @return a set of resource names that have values greater that zero.
+ */
+ public Set<String> getExecRequestKey(@Nonnull ExecutorDetails exec) {
+ NormalizedResourceRequest requestedResources =
topologyDetails.getTotalResources(exec);
+ Set<String> retVal = new HashSet<>();
+ requestedResources.toNormalizedMap().entrySet().forEach(
+ e -> {
+ if (e.getValue() > 0.0) {
+ retVal.add(e.getKey());
+ }
+ });
+ return retVal;
+ }
+
+ /**
+ * Scheduling uses {@link #sortAllNodes()} which eventually
+ * calls this method whose behavior can altered by setting {@link
#nodeSortType}.
+ *
+ * @param resourcesSummary contains all individual {@link
ObjectResourcesItem} as well as cumulative stats
+ * @param exec executor for which the sorting is done
+ * @param existingScheduleFunc a function to get existing executors
already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ protected Iterable<ObjectResourcesItem> sortObjectResources(
+ ObjectResourcesSummary resourcesSummary, ExecutorDetails exec,
ExistingScheduleFunc existingScheduleFunc) {
+ switch (nodeSortType) {
+ case DEFAULT_RAS:
+ return sortObjectResourcesDefault(resourcesSummary,
existingScheduleFunc);
+ case GENERIC_RAS:
+ return sortObjectResourcesGeneric(resourcesSummary, exec,
existingScheduleFunc);
+ case COMMON:
+ return sortObjectResourcesCommon(resourcesSummary, exec,
existingScheduleFunc);
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Sort objects by the following three criteria.
+ *
+ * <li>
+ * The number executors of the topology that needs to be scheduled is
already on the object (node or rack)
+ * in descending order. The reasoning to sort based on criterion 1 is
so we schedule the rest of a topology on
+ * the same object (node or rack) as the existing executors of the
topology.
+ * </li>
+ *
+ * <li>
+ * The subordinate/subservient resource availability percentage of a
rack in descending order We calculate the
+ * resource availability percentage by dividing the resource
availability of the object (node or rack) by the
+ * resource availability of the entire rack or cluster depending on if
object references a node or a rack.
+ * How this differs from the DefaultResourceAwareStrategy is that the
percentage boosts the node or rack if it is
+ * requested by the executor that the sorting is being done for and
pulls it down if it is not.
+ * By doing this calculation, objects (node or rack) that have
exhausted or little of one of the resources mentioned
+ * above will be ranked after racks that have more balanced resource
availability and nodes or racks that have
+ * resources that are not requested will be ranked below . So we will
be less likely to pick a rack that
+ * have a lot of one resource but a low amount of another and have a
lot of resources that are not requested by the executor.
+ * This is similar to logic used {@link
#sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails,
ExistingScheduleFunc)}.
+ * </li>
+ *
+ * <li>
+ * The tie between two nodes with same resource availability is broken
by using the node with lower minimum
+ * percentage used. This comparison was used in {@link
#sortObjectResourcesDefault(ObjectResourcesSummary, ExistingScheduleFunc)}
+ * but here it is made subservient to modified resource availbility
used in
+ * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary,
ExecutorDetails, ExistingScheduleFunc)}.
+ *
+ * </li>
+ *
+ * @param allResources contains all individual ObjectResources as
well as cumulative stats
+ * @param exec executor for which the sorting is done
+ * @param existingScheduleFunc a function to get existing executors
already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ private Iterable<ObjectResourcesItem> sortObjectResourcesCommon(
+ final ObjectResourcesSummary allResources, final ExecutorDetails
exec,
+ final ExistingScheduleFunc existingScheduleFunc) {
+ // Copy and modify allResources
+ ObjectResourcesSummary affinityBasedAllResources = new
ObjectResourcesSummary(allResources);
+ final NormalizedResourceOffer availableResourcesOverall =
allResources.getAvailableResourcesOverall();
+ final NormalizedResourceRequest requestedResources = (exec != null) ?
topologyDetails.getTotalResources(exec) : null;
+ affinityBasedAllResources.getObjectResources().forEach(
+ x -> {
+ if (requestedResources != null) {
+ // negate unrequested resources
+
x.availableResources.updateForRareResourceAffinity(requestedResources);
+ }
+ x.minResourcePercent =
availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
+ x.avgResourcePercent =
availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
+
+ LOG.trace("for {}: minResourcePercent={},
avgResourcePercent={}, numExistingSchedule={}",
+ x.id, x.minResourcePercent, x.avgResourcePercent,
+ existingScheduleFunc.getNumExistingSchedule(x.id));
+ }
+ );
+
+ // Use the following comparator to sort
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ double o1Avg = o1.avgResourcePercent;
+ double o2Avg = o2.avgResourcePercent;
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new
TreeSet(comparator);
+
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+ LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+ return sortedObjectResources;
+ }
+
+ /**
+ * Sort objects by the following two criteria.
+ *
+ * <li>the number executors of the topology that needs to be scheduled is
already on the
+ * object (node or rack) in descending order. The reasoning to sort based
on criterion 1 is so we schedule the rest
+ * of a topology on the same object (node or rack) as the existing
executors of the topology.</li>
+ *
+ * <li>the subordinate/subservient resource availability percentage of a
rack in descending order We calculate the
+ * resource availability percentage by dividing the resource availability
of the object (node or rack) by the
+ * resource availability of the entire rack or cluster depending on if
object references a node or a rack.
+ * How this differs from the DefaultResourceAwareStrategy is that the
percentage boosts the node or rack if it is
+ * requested by the executor that the sorting is being done for and pulls
it down if it is not.
+ * By doing this calculation, objects (node or rack) that have exhausted
or little of one of the resources mentioned
+ * above will be ranked after racks that have more balanced resource
availability and nodes or racks that have
+ * resources that are not requested will be ranked below . So we will be
less likely to pick a rack that
+ * have a lot of one resource but a low amount of another and have a lot
of resources that are not requested by the executor.</li>
+ *
+ * @param allResources contains all individual ObjectResources as
well as cumulative stats
+ * @param exec executor for which the sorting is done
+ * @param existingScheduleFunc a function to get existing executors
already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ @Deprecated
+ private Iterable<ObjectResourcesItem> sortObjectResourcesGeneric(
+ final ObjectResourcesSummary allResources, ExecutorDetails exec,
+ final ExistingScheduleFunc existingScheduleFunc) {
+ ObjectResourcesSummary affinityBasedAllResources = new
ObjectResourcesSummary(allResources);
+ NormalizedResourceRequest requestedResources =
topologyDetails.getTotalResources(exec);
+ affinityBasedAllResources.getObjectResources()
+ .forEach(x ->
x.availableResources.updateForRareResourceAffinity(requestedResources));
+ final NormalizedResourceOffer availableResourcesOverall =
allResources.getAvailableResourcesOverall();
+
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ double o1Avg =
availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+ double o2Avg =
availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new
TreeSet<>(comparator);
+
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+ LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+ return sortedObjectResources;
+ }
+
+ /**
+ * Sort objects by the following two criteria.
+ *
+ * <li>the number executors of the topology that needs to be scheduled is
already on the
+ * object (node or rack) in descending order. The reasoning to sort based
on criterion 1 is so we schedule the rest
+ * of a topology on the same object (node or rack) as the existing
executors of the topology.</li>
+ *
+ * <li>the subordinate/subservient resource availability percentage of a
rack in descending order We calculate the
+ * resource availability percentage by dividing the resource availability
of the object (node or rack) by the
+ * resource availability of the entire rack or cluster depending on if
object references a node or a rack.
+ * By doing this calculation, objects (node or rack) that have exhausted
or little of one of the resources mentioned
+ * above will be ranked after racks that have more balanced resource
availability. So we will be less likely to pick
+ * a rack that have a lot of one resource but a low amount of another.</li>
+ *
+ * @param allResources contains all individual ObjectResources as
well as cumulative stats
+ * @param existingScheduleFunc a function to get existing executors
already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ @Deprecated
+ private Iterable<ObjectResourcesItem> sortObjectResourcesDefault(
+ final ObjectResourcesSummary allResources,
+ final ExistingScheduleFunc existingScheduleFunc) {
+
+ final NormalizedResourceOffer availableResourcesOverall =
allResources.getAvailableResourcesOverall();
+ for (ObjectResourcesItem objectResources :
allResources.getObjectResources()) {
+ objectResources.minResourcePercent =
+
availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
+ objectResources.avgResourcePercent =
+
availableResourcesOverall.calculateAveragePercentageUsedBy(objectResources.availableResources);
+ LOG.trace("for {}: minResourcePercent={}, avgResourcePercent={},
numExistingSchedule={}",
+ objectResources.id, objectResources.minResourcePercent,
objectResources.avgResourcePercent,
+
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
+ }
+
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+ if (diff > 0.0) {
+ return -1;
+ } else if (diff < 0.0) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new
TreeSet<>(comparator);
+ sortedObjectResources.addAll(allResources.getObjectResources());
+ LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+ return sortedObjectResources;
+ }
+
+ /**
+ * Nodes are sorted by two criteria.
+ *
+ * <p>1) the number executors of the topology that needs to be scheduled
is already on the node in
+ * descending order. The reasoning to sort based on criterion 1 is so we
schedule the rest of a topology on the same node as the
+ * existing executors of the topology.
+ *
+ * <p>2) the subordinate/subservient resource availability percentage of a
node in descending
+ * order We calculate the resource availability percentage by dividing the
resource availability that have exhausted or little of one of
+ * the resources mentioned above will be ranked after on the node by the
resource availability of the entire rack By doing this
+ * calculation, nodes nodes that have more balanced resource availability.
So we will be less likely to pick a node that have a lot of
+ * one resource but a low amount of another.
+ *
+ * @param availHosts a list of all the hosts we want to sort
+ * @param rackId the rack id availNodes are a part of
+ * @return an iterable of sorted hosts.
+ */
+ private Iterable<ObjectResourcesItem> sortHosts(
+ List<String> availHosts, ExecutorDetails exec, String rackId,
+ Map<String, AtomicInteger> scheduledCount) {
+ ObjectResourcesSummary rackResourcesSummary = new
ObjectResourcesSummary("RACK");
+ availHosts.forEach(h -> {
+ ObjectResourcesItem hostItem = new ObjectResourcesItem(h);
+ for (RasNode x : hostnameToNodes.get(h)) {
+ hostItem.add(new ObjectResourcesItem(x.getId(),
x.getTotalAvailableResources(), x.getTotalResources(), 0, 0));
+ }
+ rackResourcesSummary.addObjectResourcesItem(hostItem);
+ });
+
+ LOG.debug(
+ "Rack {}: Overall Avail [ {} ] Total [ {} ]",
+ rackId,
+ rackResourcesSummary.getAvailableResourcesOverall(),
+ rackResourcesSummary.getTotalResourcesOverall());
+
+ return sortObjectResources(
+ rackResourcesSummary,
+ exec,
+ (hostId) -> {
+ AtomicInteger count = scheduledCount.get(hostId);
+ if (count == null) {
+ return 0;
+ }
+ return count.get();
+ });
+ }
+
+ /**
+ * Nodes are sorted by two criteria.
+ *
+ * <p>1) the number executors of the topology that needs to be scheduled
is already on the node in
+ * descending order. The reasoning to sort based on criterion 1 is so we
schedule the rest of a topology on the same node as the
+ * existing executors of the topology.
+ *
+ * <p>2) the subordinate/subservient resource availability percentage of a
node in descending
+ * order We calculate the resource availability percentage by dividing the
resource availability that have exhausted or little of one of
+ * the resources mentioned above will be ranked after on the node by the
resource availability of the entire rack By doing this
+ * calculation, nodes nodes that have more balanced resource availability.
So we will be less likely to pick a node that have a lot of
+ * one resource but a low amount of another.
+ *
+ * @param availRasNodes a list of all the nodes we want to sort
+ * @param hostId the host-id that availNodes are a part of
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem} for
nodes.
+ */
+ private Iterable<ObjectResourcesItem> sortNodes(
+ List<RasNode> availRasNodes, ExecutorDetails exec, String hostId,
+ Map<String, AtomicInteger> scheduledCount) {
+ ObjectResourcesSummary hostResourcesSummary = new
ObjectResourcesSummary("HOST");
+ availRasNodes.forEach(x ->
+ hostResourcesSummary.addObjectResourcesItem(
+ new ObjectResourcesItem(x.getId(),
x.getTotalAvailableResources(), x.getTotalResources(), 0, 0)
+ )
+ );
+
+ LOG.debug(
+ "Host {}: Overall Avail [ {} ] Total [ {} ]",
+ hostId,
+ hostResourcesSummary.getAvailableResourcesOverall(),
+ hostResourcesSummary.getTotalResourcesOverall());
+
+ return sortObjectResources(
+ hostResourcesSummary,
+ exec,
+ (superId) -> {
+ AtomicInteger count = scheduledCount.get(superId);
+ if (count == null) {
+ return 0;
+ }
+ return count.get();
+ });
+ }
+
+ protected List<String> makeHostToNodeIds(List<String> hosts) {
+ if (hosts == null) {
+ return Collections.emptyList();
+ }
+ List<String> ret = new ArrayList<>(hosts.size());
+ for (String host: hosts) {
+ List<RasNode> nodes = hostnameToNodes.get(host);
+ if (nodes != null) {
+ for (RasNode node : nodes) {
+ ret.add(node.getId());
+ }
+ }
+ }
+ return ret;
+ }
+
+ private class LazyNodeSortingIterator implements Iterator<String> {
+ private final LazyNodeSorting parent;
+ private final Iterator<ObjectResourcesItem> rackIterator;
+ private Iterator<ObjectResourcesItem> hostIterator;
+ private Iterator<ObjectResourcesItem> nodeIterator;
+ private String nextValueFromNode = null;
+ private final Iterator<String> pre;
+ private final Iterator<String> post;
+ private final Set<String> skip;
+
+ LazyNodeSortingIterator(LazyNodeSorting parent,
Iterable<ObjectResourcesItem> sortedRacks) {
+ this.parent = parent;
+ rackIterator = sortedRacks.iterator();
+ pre = favoredNodeIds.iterator();
+ post = Stream.concat(unFavoredNodeIds.stream(),
greyListedSupervisorIds.stream())
+ .collect(Collectors.toList())
+ .iterator();
+ skip = parent.skippedNodeIds;
+ }
+
+ private Iterator<ObjectResourcesItem> getNodeIterator() {
+ if (nodeIterator != null && nodeIterator.hasNext()) {
+ return nodeIterator;
+ }
+ //need to get the next host/node iterator
+ if (hostIterator != null && hostIterator.hasNext()) {
+ ObjectResourcesItem host = hostIterator.next();
+ final String hostId = host.id;
+ nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
+ return nodeIterator;
+ }
+ if (rackIterator.hasNext()) {
+ ObjectResourcesItem rack = rackIterator.next();
+ final String rackId = rack.id;
+ hostIterator = parent.getSortedHostsForRack(rackId).iterator();
+ ObjectResourcesItem host = hostIterator.next();
+ final String hostId = host.id;
+ nodeIterator = parent.getSortedNodesForHost(hostId).iterator();
+ return nodeIterator;
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (pre.hasNext()) {
+ return true;
+ }
+ if (nextValueFromNode != null) {
+ return true;
+ }
+ while (true) {
+ //For the node we don't know if we have another one unless we
look at the contents
+ Iterator<ObjectResourcesItem> nodeIterator = getNodeIterator();
+ if (nodeIterator == null || !nodeIterator.hasNext()) {
+ break;
+ }
+ String tmp = nodeIterator.next().id;
+ if (!skip.contains(tmp)) {
+ nextValueFromNode = tmp;
+ return true;
+ }
+ }
+ return post.hasNext();
+ }
+
+ @Override
+ public String next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (pre.hasNext()) {
+ return pre.next();
+ }
+ if (nextValueFromNode != null) {
+ String tmp = nextValueFromNode;
+ nextValueFromNode = null;
+ return tmp;
+ }
+ return post.next();
+ }
+ }
+
+ private class LazyNodeSorting implements Iterable<String> {
+ private final Map<String, AtomicInteger> perHostScheduledCount = new
HashMap<>();
+ private final Map<String, AtomicInteger> perNodeScheduledCount = new
HashMap<>();
+ private final Iterable<ObjectResourcesItem> sortedRacks;
+ private final Map<String, Iterable<ObjectResourcesItem>> cachedHosts =
new HashMap<>();
+ private final Map<String, Iterable<ObjectResourcesItem>>
cachedNodesByHost = new HashMap<>();
+ private final ExecutorDetails exec;
+ private final Set<String> skippedNodeIds = new HashSet<>();
+
+ LazyNodeSorting(ExecutorDetails exec) {
+ this.exec = exec;
+ skippedNodeIds.addAll(favoredNodeIds);
+ skippedNodeIds.addAll(unFavoredNodeIds);
+ skippedNodeIds.addAll(greyListedSupervisorIds);
+
+ String topoId = topologyDetails.getId();
+ SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
+ if (assignment != null) {
+ for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+ assignment.getSlotToExecutors().entrySet()) {
+ String superId = entry.getKey().getNodeId();
+ String hostId = nodeIdToHostname.get(superId);
+ perHostScheduledCount.computeIfAbsent(hostId, id -> new
AtomicInteger(0))
+ .getAndAdd(entry.getValue().size());
+ perNodeScheduledCount.computeIfAbsent(superId, id -> new
AtomicInteger(0))
+ .getAndAdd(entry.getValue().size());
+ }
+ }
+ sortedRacks = getSortedRacks();
+ }
+
+ private Iterable<ObjectResourcesItem> getSortedHostsForRack(String
rackId) {
+ return cachedHosts.computeIfAbsent(rackId,
+ id -> sortHosts(rackIdToHosts.getOrDefault(id,
Collections.emptyList()), exec, id, perHostScheduledCount));
+ }
+
+ private Iterable<ObjectResourcesItem> getSortedNodesForHost(String
hostId) {
+ return cachedNodesByHost.computeIfAbsent(hostId,
+ id -> sortNodes(hostnameToNodes.getOrDefault(id,
Collections.emptyList()), exec, id, perNodeScheduledCount));
+ }
+
+ @Override
+ public Iterator<String> iterator() {
+ return new LazyNodeSortingIterator(this, sortedRacks);
+ }
+ }
+
+ @Override
+ public Iterable<String> sortAllNodes() {
+ //LOG.info("sortAllNodes():cachedLazyNodeIterators.size={},
execRequestResourceKey={}",
+ // cachedLazyNodeIterators.size(), execRequestResourcesKey);
+ //return
cachedLazyNodeIterators.computeIfAbsent(execRequestResourcesKey, k -> new
LazyNodeSorting(exec));
+ return new LazyNodeSorting(exec);
+ }
+
+ private ObjectResourcesSummary createClusterSummarizedResources() {
+ ObjectResourcesSummary clusterResourcesSummary = new
ObjectResourcesSummary("Cluster");
+
+ //This is the first time so initialize the resources.
+ for (Map.Entry<String, List<String>> entry :
networkTopography.entrySet()) {
+ String rackId = entry.getKey();
+ List<String> nodeHosts = entry.getValue();
+ ObjectResourcesItem rack = new ObjectResourcesItem(rackId);
+ for (String nodeHost : nodeHosts) {
+ for (RasNode node : hostnameToNodes(nodeHost)) {
+
rack.availableResources.add(node.getTotalAvailableResources());
+ rack.totalResources.add(node.getTotalAvailableResources());
Review comment:
Should this be `node.getTotalResources()`?
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
##########
@@ -339,4 +354,31 @@ boolean wouldFit(
* Get the nimbus configuration.
*/
Map<String, Object> getConf();
+
+ /**
+ * Determine the list of racks on which topologyIds have been assigned.
Note that the returned set
+ * may contain {@link DNSToSwitchMapping#DEFAULT_RACK} if {@link
#getHostToRack()} is null or
+ * does not contain the assigned host.
+ *
+ * @param topologyIds for which assignments are examined.
+ * @return set of racks on which assignments have been made.
+ */
+ default Set<String> getAssignedRacks(String... topologyIds) {
Review comment:
This seems only used in test cases. Do we want to move it into some test
classes, instead of adding it in the interface? So the interface can be smaller
and developers don't need to worry about this method
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
##########
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
+import org.apache.storm.Config;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNode;
+import org.apache.storm.scheduler.resource.RasNodes;
+import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeSorterHostProximity implements INodeSorter {
+ private static final Logger LOG =
LoggerFactory.getLogger(NodeSorterHostProximity.class);
+
+ // instance variables from class instantiation
+ protected final BaseResourceAwareStrategy.NodeSortType nodeSortType;
+
+ protected Cluster cluster;
+ protected TopologyDetails topologyDetails;
+
+ // Instance variables derived from Cluster.
+ private final Map<String, List<String>> networkTopography;
+ private final Map<String, String> superIdToRack = new HashMap<>();
+ private final Map<String, List<RasNode>> hostnameToNodes = new HashMap<>();
+ private final Map<String, String> nodeIdToHostname = new HashMap<>();
+ private final Map<String, Set<String>> rackIdToHosts;
+ protected List<String> greyListedSupervisorIds;
+
+ // Instance variables from Cluster and TopologyDetails.
+ protected List<String> favoredNodeIds;
+ protected List<String> unFavoredNodeIds;
+
+ // Updated in prepare method
+ ExecutorDetails exec;
+
+ public NodeSorterHostProximity(Cluster cluster, TopologyDetails
topologyDetails) {
+ this(cluster, topologyDetails,
BaseResourceAwareStrategy.NodeSortType.COMMON);
+ }
+
+ /**
+ * Initialize for the default implementation node sorting.
+ *
+ * <p>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#GENERIC_RAS} sorting
implemented in
+ * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary,
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#DEFAULT_RAS} sorting
implemented in
+ * {@link #sortObjectResourcesDefault(ObjectResourcesSummary,
NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * <li>{@link BaseResourceAwareStrategy.NodeSortType#COMMON} sorting
implemented in
+ * {@link #sortObjectResourcesCommon(ObjectResourcesSummary,
ExecutorDetails, NodeSorterHostProximity.ExistingScheduleFunc)}</li>
+ * </p>
+ *
+ * @param cluster for which nodes will be sorted.
+ * @param topologyDetails the topology to sort for.
+ * @param nodeSortType type of sorting to be applied to object resource
collection {@link BaseResourceAwareStrategy.NodeSortType}.
+ */
+ public NodeSorterHostProximity(Cluster cluster, TopologyDetails
topologyDetails, BaseResourceAwareStrategy.NodeSortType nodeSortType) {
+ this.cluster = cluster;
+ this.topologyDetails = topologyDetails;
+ this.nodeSortType = nodeSortType;
+
+ // from Cluster
+ networkTopography = cluster.getNetworkTopography();
+ greyListedSupervisorIds = cluster.getGreyListedSupervisors();
+ Map<String, String> hostToRack = cluster.getHostToRack();
+ RasNodes nodes = new RasNodes(cluster);
+ for (RasNode node: nodes.getNodes()) {
+ String superId = node.getId();
+ String hostName = node.getHostname();
+ if (hostName == null) {
+ // ignore supervisors on blacklistedHosts
+ continue;
+ }
+ String rackId = hostToRack.getOrDefault(hostName,
DNSToSwitchMapping.DEFAULT_RACK);
+ superIdToRack.put(superId, rackId);
+ hostnameToNodes.computeIfAbsent(hostName, (hn) -> new
ArrayList<>()).add(node);
+ nodeIdToHostname.put(superId, hostName);
+ }
+ rackIdToHosts = computeRackToHosts(cluster, superIdToRack);
+
+ // from TopologyDetails
+ Map<String, Object> topoConf = topologyDetails.getConf();
+
+ // From Cluster and TopologyDetails - and cleaned-up
+ favoredNodeIds = makeHostToNodeIds((List<String>)
topoConf.get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+ unFavoredNodeIds = makeHostToNodeIds((List<String>)
topoConf.get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
+ favoredNodeIds.removeAll(greyListedSupervisorIds);
+ unFavoredNodeIds.removeAll(greyListedSupervisorIds);
+ unFavoredNodeIds.removeAll(favoredNodeIds);
+ }
+
+ @VisibleForTesting
+ public Map<String, Set<String>> getRackIdToHosts() {
+ return rackIdToHosts;
+ }
+
+ @Override
+ public void prepare(ExecutorDetails exec) {
+ this.exec = exec;
+ }
+
+ /**
+ * Get a key corresponding to this executor resource request. The key
contains all non-zero resources requested by
+ * the executor.
+ *
+ * <p>This key can be used to retrieve pre-sorted list of
racks/hosts/nodes. So executors with similar set of
+ * rare resource request will be reuse the same cached list instead of
creating a new sorted list of nodes.</p>
+ *
+ * @param exec executor whose requested resources are being examined.
+ * @return a set of resource names that have values greater that zero.
+ */
+ public Set<String> getExecRequestKey(@Nonnull ExecutorDetails exec) {
+ NormalizedResourceRequest requestedResources =
topologyDetails.getTotalResources(exec);
+ Set<String> retVal = new HashSet<>();
+ requestedResources.toNormalizedMap().entrySet().forEach(
+ e -> {
+ if (e.getValue() > 0.0) {
+ retVal.add(e.getKey());
+ }
+ });
+ return retVal;
+ }
+
+ /**
+ * Scheduling uses {@link #sortAllNodes()} which eventually
+ * calls this method whose behavior can altered by setting {@link
#nodeSortType}.
+ *
+ * @param resourcesSummary contains all individual {@link
ObjectResourcesItem} as well as cumulative stats
+ * @param exec executor for which the sorting is done
+ * @param existingScheduleFunc a function to get existing executors
already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ protected Iterable<ObjectResourcesItem> sortObjectResources(
+ ObjectResourcesSummary resourcesSummary, ExecutorDetails exec,
ExistingScheduleFunc existingScheduleFunc) {
+ switch (nodeSortType) {
+ case DEFAULT_RAS:
+ return sortObjectResourcesDefault(resourcesSummary,
existingScheduleFunc);
+ case GENERIC_RAS:
+ return sortObjectResourcesGeneric(resourcesSummary, exec,
existingScheduleFunc);
+ case COMMON:
+ return sortObjectResourcesCommon(resourcesSummary, exec,
existingScheduleFunc);
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Sort objects by the following three criteria.
+ *
+ * <li>
+ * The number executors of the topology that needs to be scheduled is
already on the object (node or rack)
+ * in descending order. The reasoning to sort based on criterion 1 is
so we schedule the rest of a topology on
+ * the same object (node or rack) as the existing executors of the
topology.
+ * </li>
+ *
+ * <li>
+ * The subordinate/subservient resource availability percentage of a
rack in descending order We calculate the
+ * resource availability percentage by dividing the resource
availability of the object (node or rack) by the
+ * resource availability of the entire rack or cluster depending on if
object references a node or a rack.
+ * How this differs from the DefaultResourceAwareStrategy is that the
percentage boosts the node or rack if it is
+ * requested by the executor that the sorting is being done for and
pulls it down if it is not.
+ * By doing this calculation, objects (node or rack) that have
exhausted or little of one of the resources mentioned
+ * above will be ranked after racks that have more balanced resource
availability and nodes or racks that have
+ * resources that are not requested will be ranked below . So we will
be less likely to pick a rack that
+ * have a lot of one resource but a low amount of another and have a
lot of resources that are not requested by the executor.
+ * This is similar to logic used {@link
#sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails,
ExistingScheduleFunc)}.
+ * </li>
+ *
+ * <li>
+ * The tie between two nodes with same resource availability is broken
by using the node with lower minimum
+ * percentage used. This comparison was used in {@link
#sortObjectResourcesDefault(ObjectResourcesSummary, ExistingScheduleFunc)}
+ * but here it is made subservient to modified resource availbility
used in
+ * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary,
ExecutorDetails, ExistingScheduleFunc)}.
+ *
+ * </li>
+ *
+ * @param allResources contains all individual ObjectResources as
well as cumulative stats
+ * @param exec executor for which the sorting is done
+ * @param existingScheduleFunc a function to get existing executors
already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ private Iterable<ObjectResourcesItem> sortObjectResourcesCommon(
+ final ObjectResourcesSummary allResources, final ExecutorDetails
exec,
+ final ExistingScheduleFunc existingScheduleFunc) {
+ // Copy and modify allResources
+ ObjectResourcesSummary affinityBasedAllResources = new
ObjectResourcesSummary(allResources);
+ final NormalizedResourceOffer availableResourcesOverall =
allResources.getAvailableResourcesOverall();
+ final NormalizedResourceRequest requestedResources = (exec != null) ?
topologyDetails.getTotalResources(exec) : null;
+ affinityBasedAllResources.getObjectResources().forEach(
+ x -> {
+ if (requestedResources != null) {
+ // negate unrequested resources
+
x.availableResources.updateForRareResourceAffinity(requestedResources);
+ }
+ x.minResourcePercent =
availableResourcesOverall.calculateMinPercentageUsedBy(x.availableResources);
+ x.avgResourcePercent =
availableResourcesOverall.calculateAveragePercentageUsedBy(x.availableResources);
+
+ LOG.trace("for {}: minResourcePercent={},
avgResourcePercent={}, numExistingSchedule={}",
+ x.id, x.minResourcePercent, x.avgResourcePercent,
+ existingScheduleFunc.getNumExistingSchedule(x.id));
+ }
+ );
+
+ // Use the following comparator to sort
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ double o1Avg = o1.avgResourcePercent;
+ double o2Avg = o2.avgResourcePercent;
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new
TreeSet(comparator);
+
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+ LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+ return sortedObjectResources;
+ }
+
+ /**
+ * Sort objects by the following two criteria.
+ *
+ * <li>the number executors of the topology that needs to be scheduled is
already on the
+ * object (node or rack) in descending order. The reasoning to sort based
on criterion 1 is so we schedule the rest
+ * of a topology on the same object (node or rack) as the existing
executors of the topology.</li>
+ *
+ * <li>the subordinate/subservient resource availability percentage of a
rack in descending order We calculate the
+ * resource availability percentage by dividing the resource availability
of the object (node or rack) by the
+ * resource availability of the entire rack or cluster depending on if
object references a node or a rack.
+ * How this differs from the DefaultResourceAwareStrategy is that the
percentage boosts the node or rack if it is
+ * requested by the executor that the sorting is being done for and pulls
it down if it is not.
+ * By doing this calculation, objects (node or rack) that have exhausted
or little of one of the resources mentioned
+ * above will be ranked after racks that have more balanced resource
availability and nodes or racks that have
+ * resources that are not requested will be ranked below . So we will be
less likely to pick a rack that
+ * have a lot of one resource but a low amount of another and have a lot
of resources that are not requested by the executor.</li>
+ *
+ * @param allResources contains all individual ObjectResources as
well as cumulative stats
+ * @param exec executor for which the sorting is done
+ * @param existingScheduleFunc a function to get existing executors
already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ @Deprecated
+ private Iterable<ObjectResourcesItem> sortObjectResourcesGeneric(
+ final ObjectResourcesSummary allResources, ExecutorDetails exec,
+ final ExistingScheduleFunc existingScheduleFunc) {
+ ObjectResourcesSummary affinityBasedAllResources = new
ObjectResourcesSummary(allResources);
+ NormalizedResourceRequest requestedResources =
topologyDetails.getTotalResources(exec);
+ affinityBasedAllResources.getObjectResources()
+ .forEach(x ->
x.availableResources.updateForRareResourceAffinity(requestedResources));
+ final NormalizedResourceOffer availableResourcesOverall =
allResources.getAvailableResourcesOverall();
+
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ double o1Avg =
availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+ double o2Avg =
availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new
TreeSet<>(comparator);
+
sortedObjectResources.addAll(affinityBasedAllResources.getObjectResources());
+ LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+ return sortedObjectResources;
+ }
+
+ /**
+ * Sort objects by the following two criteria.
+ *
+ * <li>the number executors of the topology that needs to be scheduled is
already on the
+ * object (node or rack) in descending order. The reasoning to sort based
on criterion 1 is so we schedule the rest
+ * of a topology on the same object (node or rack) as the existing
executors of the topology.</li>
+ *
+ * <li>the subordinate/subservient resource availability percentage of a
rack in descending order We calculate the
+ * resource availability percentage by dividing the resource availability
of the object (node or rack) by the
+ * resource availability of the entire rack or cluster depending on if
object references a node or a rack.
+ * By doing this calculation, objects (node or rack) that have exhausted
or little of one of the resources mentioned
+ * above will be ranked after racks that have more balanced resource
availability. So we will be less likely to pick
+ * a rack that have a lot of one resource but a low amount of another.</li>
+ *
+ * @param allResources contains all individual ObjectResources as
well as cumulative stats
+ * @param existingScheduleFunc a function to get existing executors
already scheduled on this object
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem}
+ */
+ @Deprecated
+ private Iterable<ObjectResourcesItem> sortObjectResourcesDefault(
+ final ObjectResourcesSummary allResources,
+ final ExistingScheduleFunc existingScheduleFunc) {
+
+ final NormalizedResourceOffer availableResourcesOverall =
allResources.getAvailableResourcesOverall();
+ for (ObjectResourcesItem objectResources :
allResources.getObjectResources()) {
+ objectResources.minResourcePercent =
+
availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
+ objectResources.avgResourcePercent =
+
availableResourcesOverall.calculateAveragePercentageUsedBy(objectResources.availableResources);
+ LOG.trace("for {}: minResourcePercent={}, avgResourcePercent={},
numExistingSchedule={}",
+ objectResources.id, objectResources.minResourcePercent,
objectResources.avgResourcePercent,
+
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
+ }
+
+ Comparator<ObjectResourcesItem> comparator = (o1, o2) -> {
+ int execsScheduled1 =
existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 =
existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ }
+ if (o1.minResourcePercent > o2.minResourcePercent) {
+ return -1;
+ } else if (o1.minResourcePercent < o2.minResourcePercent) {
+ return 1;
+ }
+ double diff = o1.avgResourcePercent - o2.avgResourcePercent;
+ if (diff > 0.0) {
+ return -1;
+ } else if (diff < 0.0) {
+ return 1;
+ }
+ return o1.id.compareTo(o2.id);
+ };
+ TreeSet<ObjectResourcesItem> sortedObjectResources = new
TreeSet<>(comparator);
+ sortedObjectResources.addAll(allResources.getObjectResources());
+ LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+ return sortedObjectResources;
+ }
+
+ /**
+ * Nodes are sorted by two criteria.
+ *
+ * <p>1) the number executors of the topology that needs to be scheduled
is already on the node in
+ * descending order. The reasoning to sort based on criterion 1 is so we
schedule the rest of a topology on the same node as the
+ * existing executors of the topology.
+ *
+ * <p>2) the subordinate/subservient resource availability percentage of a
node in descending
+ * order We calculate the resource availability percentage by dividing the
resource availability that have exhausted or little of one of
+ * the resources mentioned above will be ranked after on the node by the
resource availability of the entire rack By doing this
+ * calculation, nodes nodes that have more balanced resource availability.
So we will be less likely to pick a node that have a lot of
+ * one resource but a low amount of another.
+ *
+ * @param availHosts a collection of all the hosts we want to sort
+ * @param rackId the rack id availNodes are a part of
+ * @return an iterable of sorted hosts.
+ */
+ private Iterable<ObjectResourcesItem> sortHosts(
+ Collection<String> availHosts, ExecutorDetails exec, String rackId,
+ Map<String, AtomicInteger> scheduledCount) {
+ ObjectResourcesSummary rackResourcesSummary = new
ObjectResourcesSummary("RACK");
+ availHosts.forEach(h -> {
+ ObjectResourcesItem hostItem = new ObjectResourcesItem(h);
+ for (RasNode x : hostnameToNodes.get(h)) {
+ hostItem.add(new ObjectResourcesItem(x.getId(),
x.getTotalAvailableResources(), x.getTotalResources(), 0, 0));
+ }
+ rackResourcesSummary.addObjectResourcesItem(hostItem);
+ });
+
+ LOG.debug(
+ "Rack {}: Overall Avail [ {} ] Total [ {} ]",
+ rackId,
+ rackResourcesSummary.getAvailableResourcesOverall(),
+ rackResourcesSummary.getTotalResourcesOverall());
+
+ return sortObjectResources(
+ rackResourcesSummary,
+ exec,
+ (hostId) -> {
+ AtomicInteger count = scheduledCount.get(hostId);
+ if (count == null) {
+ return 0;
+ }
+ return count.get();
+ });
+ }
+
+ /**
+ * Nodes are sorted by two criteria.
+ *
+ * <p>1) the number executors of the topology that needs to be scheduled
is already on the node in
+ * descending order. The reasoning to sort based on criterion 1 is so we
schedule the rest of a topology on the same node as the
+ * existing executors of the topology.
+ *
+ * <p>2) the subordinate/subservient resource availability percentage of a
node in descending
+ * order We calculate the resource availability percentage by dividing the
resource availability that have exhausted or little of one of
+ * the resources mentioned above will be ranked after on the node by the
resource availability of the entire rack By doing this
+ * calculation, nodes nodes that have more balanced resource availability.
So we will be less likely to pick a node that have a lot of
+ * one resource but a low amount of another.
+ *
+ * @param availRasNodes a list of all the nodes we want to sort
+ * @param hostId the host-id that availNodes are a part of
+ * @return an {@link Iterable} of sorted {@link ObjectResourcesItem} for
nodes.
+ */
+ private Iterable<ObjectResourcesItem> sortNodes(
+ List<RasNode> availRasNodes, ExecutorDetails exec, String hostId,
+ Map<String, AtomicInteger> scheduledCount) {
Review comment:
Can you please help me understand why AtomicInteger is required? My
understanding is that scheduling is done sequentially in one thread.
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
##########
@@ -189,7 +207,7 @@ protected void prepareForScheduling(Cluster cluster,
TopologyDetails topologyDet
LOG.debug("The max state search that will be used by topology {} is
{}", topologyDetails.getId(), maxStateSearch);
searcherState = createSearcherState();
- setNodeSorter(new NodeSorter(cluster, topologyDetails, nodeSortType));
+ setNodeSorter(new NodeSorterHostProximity(cluster, topologyDetails));
Review comment:
Do we not care about `nodeSortType` in this new
`NodeSorterHostProximity` class?
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
##########
@@ -282,6 +285,18 @@ boolean wouldFit(
*/
Map<String, List<String>> getNetworkTopography();
+ /**
+ * Get host -> rack map - the inverse of networkTopography.
+ */
+ default Map<String, String> getHostToRack() {
Review comment:
Since this is calculated based on getNetworkTopography, do we want to
cache it in the implementation class like `networkTopography` instead of
recomputing it every time?
##########
File path:
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java
##########
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
+import org.apache.storm.Config;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RasNode;
+import org.apache.storm.scheduler.resource.RasNodes;
+import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesItem;
+import
org.apache.storm.scheduler.resource.strategies.scheduling.ObjectResourcesSummary;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeSorterHostProximity implements INodeSorter {
Review comment:
This class looks very similar to NodeSorter. Why do we want to keep both
of them? And if we need to keep both of them, does it make sense to have
NodeSorterHostProximity extend NodeSorter?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]