[
https://issues.apache.org/jira/browse/STORM-893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14945238#comment-14945238
]
ASF GitHub Bot commented on STORM-893:
--------------------------------------
Github user zhuoliu commented on a diff in the pull request:
https://github.com/apache/storm/pull/746#discussion_r41283265
--- Diff:
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
---
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.Component;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public class ResourceAwareStrategy implements IStrategy {
+ private Logger LOG = null;
+ private Topologies _topologies;
+ private Cluster _cluster;
+ //Map key is the supervisor id and the value is the corresponding
RAS_Node Object
+ private Map<String, RAS_Node> _availNodes;
+ private RAS_Node refNode = null;
+ /**
+ * supervisor id -> Node
+ */
+ private Map<String, RAS_Node> _nodes;
+ private Map<String, List<String>> _clusterInfo;
+
+ private final double CPU_WEIGHT = 1.0;
+ private final double MEM_WEIGHT = 1.0;
+ private final double NETWORK_WEIGHT = 1.0;
+
+ public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
+ _topologies = topologies;
+ _cluster = cluster;
+ _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
+ _availNodes = this.getAvailNodes();
+ this.LOG = LoggerFactory.getLogger(this.getClass());
+ _clusterInfo = cluster.getNetworkTopography();
+ LOG.debug(this.getClusterInfo());
+ }
+
+ //the returned TreeMap keeps the Components sorted
+ private TreeMap<Integer, List<ExecutorDetails>>
getPriorityToExecutorDetailsListMap(
+ Queue<Component> ordered__Component_list,
Collection<ExecutorDetails> unassignedExecutors) {
+ TreeMap<Integer, List<ExecutorDetails>> retMap = new
TreeMap<Integer, List<ExecutorDetails>>();
+ Integer rank = 0;
+ for (Component ras_comp : ordered__Component_list) {
+ retMap.put(rank, new ArrayList<ExecutorDetails>());
+ for(ExecutorDetails exec : ras_comp.execs) {
+ if(unassignedExecutors.contains(exec)) {
+ retMap.get(rank).add(exec);
+ }
+ }
+ rank++;
+ }
+ return retMap;
+ }
+
+ public Map<WorkerSlot, Collection<ExecutorDetails>>
schedule(TopologyDetails td) {
+ if (_availNodes.size() <= 0) {
+ LOG.warn("No available nodes to schedule tasks on!");
+ return null;
+ }
+ Collection<ExecutorDetails> unassignedExecutors =
_cluster.getUnassignedExecutors(td);
+ Map<WorkerSlot, Collection<ExecutorDetails>>
schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
+ LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+ Collection<ExecutorDetails> scheduledTasks = new
ArrayList<ExecutorDetails>();
+ List<Component> spouts = this.getSpouts(_topologies, td);
+
+ if (spouts.size() == 0) {
+ LOG.error("Cannot find a Spout!");
+ return null;
+ }
+
+ Queue<Component> ordered__Component_list = bfs(_topologies, td,
spouts);
+
+ Map<Integer, List<ExecutorDetails>> priorityToExecutorMap =
getPriorityToExecutorDetailsListMap(ordered__Component_list,
unassignedExecutors);
+ Collection<ExecutorDetails> executorsNotScheduled = new
HashSet<ExecutorDetails>(unassignedExecutors);
+ Integer longestPriorityListSize =
this.getLongestPriorityListSize(priorityToExecutorMap);
+ //Pick the first executor with priority one, then the 1st exec
with priority 2, so on an so forth.
+ //Once we reach the last priority, we go back to priority 1 and
schedule the second task with priority 1.
+ for (int i = 0; i < longestPriorityListSize; i++) {
+ for (Entry<Integer, List<ExecutorDetails>> entry :
priorityToExecutorMap.entrySet()) {
+ Iterator<ExecutorDetails> it = entry.getValue().iterator();
+ if (it.hasNext()) {
+ ExecutorDetails exec = it.next();
+ LOG.debug("\n\nAttempting to schedule: {} of component
{}[avail {}] with rank {}",
+ new Object[] { exec,
td.getExecutorToComponent().get(exec),
+ td.getTaskResourceReqList(exec), entry.getKey() });
+ WorkerSlot targetSlot = this.findWorkerForExec(exec,
td, schedulerAssignmentMap);
+ if (targetSlot != null) {
+ RAS_Node targetNode =
this.idToNode(targetSlot.getNodeId());
+
if(!schedulerAssignmentMap.containsKey(targetSlot)) {
+ schedulerAssignmentMap.put(targetSlot, new
LinkedList<ExecutorDetails>());
+ }
+
+ schedulerAssignmentMap.get(targetSlot).add(exec);
+ targetNode.consumeResourcesforTask(exec, td);
+ scheduledTasks.add(exec);
+ LOG.debug("TASK {} assigned to Node: {} avail
[mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+ targetNode,
targetNode.getAvailableMemoryResources(),
+ targetNode.getAvailableCpuResources(),
targetNode.getTotalMemoryResources(),
+ targetNode.getTotalCpuResources(),
targetSlot);
+ } else {
+ LOG.error("Not Enough Resources to schedule Task
{}", exec);
+ }
+ it.remove();
+ }
+ }
+ }
+
+ executorsNotScheduled.removeAll(scheduledTasks);
+ LOG.debug("/* Scheduling left over task (most likely sys tasks)
*/");
+ // schedule left over system tasks
+ for (ExecutorDetails exec : executorsNotScheduled) {
+ WorkerSlot targetSlot = this.findWorkerForExec(exec, td,
schedulerAssignmentMap);
+ if (targetSlot != null) {
+ RAS_Node targetNode =
this.idToNode(targetSlot.getNodeId());
+ if(schedulerAssignmentMap.containsKey(targetSlot) ==
false) {
+ schedulerAssignmentMap.put(targetSlot, new
LinkedList<ExecutorDetails>());
+ }
+
+ schedulerAssignmentMap.get(targetSlot).add(exec);
+ targetNode.consumeResourcesforTask(exec, td);
+ scheduledTasks.add(exec);
+ LOG.debug("TASK {} assigned to Node: {} avail [mem: {}
cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+ targetNode,
targetNode.getAvailableMemoryResources(),
+ targetNode.getAvailableCpuResources(),
targetNode.getTotalMemoryResources(),
+ targetNode.getTotalCpuResources(), targetSlot);
+ } else {
+ LOG.error("Not Enough Resources to schedule Task {}",
exec);
+ }
+ }
+ executorsNotScheduled.removeAll(scheduledTasks);
+ if (executorsNotScheduled.size() > 0) {
+ LOG.error("Not all executors successfully scheduled: {}",
+ executorsNotScheduled);
+ schedulerAssignmentMap = null;
+ } else {
+ LOG.debug("All resources successfully scheduled!");
+ }
+ if (schedulerAssignmentMap == null) {
+ LOG.error("Topology {} not successfully scheduled!",
td.getId());
+ }
+ return schedulerAssignmentMap;
+ }
+
+ private WorkerSlot findWorkerForExec(ExecutorDetails exec,
TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>>
scheduleAssignmentMap) {
+ WorkerSlot ws = null;
+ // first scheduling
+ if (this.refNode == null) {
+ String clus = this.getBestClustering();
+ ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
+ } else {
+ ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
+ }
+ if(ws != null) {
+ this.refNode = this.idToNode(ws.getNodeId());
+ }
+ LOG.debug("reference node for the resource aware scheduler is: {}",
this.refNode);
+ return ws;
+ }
+
+ private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails
td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+ return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
+ }
+
+ private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails
td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>>
scheduleAssignmentMap) {
+ double taskMem = td.getTotalMemReqTask(exec);
+ double taskCPU = td.getTotalCpuReqTask(exec);
+ List<RAS_Node> nodes;
+ if(clusterId != null) {
+ nodes = this.getAvailableNodesFromCluster(clusterId);
+
+ } else {
+ nodes = this.getAvailableNodes();
+ }
+ TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double,
RAS_Node>();
--- End diff --
May add some brief comments to introduce the logic of choosing best node
here.
> Resource Aware Scheduling
> -------------------------
>
> Key: STORM-893
> URL: https://issues.apache.org/jira/browse/STORM-893
> Project: Apache Storm
> Issue Type: Umbrella
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Boyang Jerry Peng
> Attachments: resource_aware_scheduler_api.pdf
>
>
> At Yahoo we have been working on resource aware scheduling in storm, based
> off of some work done in academia. This rollup ticket is to track the
> complete project. With several sub tasks. Some that are already done and
> need to be pushed back, and others that we have not started on yet.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)