Github user zhuoliu commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41283467 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java --- @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource.strategies; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.TreeMap; +import java.util.HashSet; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.scheduler.Cluster; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.Component; +import backtype.storm.scheduler.resource.RAS_Node; + +public class ResourceAwareStrategy implements IStrategy { + private Logger LOG = null; + private Topologies _topologies; + private Cluster _cluster; + //Map key is the supervisor id and the value is the corresponding RAS_Node Object + private Map<String, RAS_Node> _availNodes; + private RAS_Node refNode = null; + /** + * supervisor id -> Node + */ + private Map<String, RAS_Node> _nodes; + private Map<String, List<String>> _clusterInfo; + + private final double CPU_WEIGHT = 1.0; + private final double MEM_WEIGHT = 1.0; + private final double NETWORK_WEIGHT = 1.0; + + public ResourceAwareStrategy(Cluster cluster, Topologies topologies) { + _topologies = topologies; + _cluster = cluster; + _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies); + _availNodes = this.getAvailNodes(); + this.LOG = LoggerFactory.getLogger(this.getClass()); + _clusterInfo = cluster.getNetworkTopography(); + LOG.debug(this.getClusterInfo()); + } + + //the returned TreeMap keeps the Components sorted + private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap( + Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) { + TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<Integer, List<ExecutorDetails>>(); + Integer rank = 0; + for (Component ras_comp : ordered__Component_list) { + retMap.put(rank, new ArrayList<ExecutorDetails>()); + for(ExecutorDetails exec : ras_comp.execs) { + if(unassignedExecutors.contains(exec)) { + retMap.get(rank).add(exec); + } + } + rank++; + } + return retMap; + } + + public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td) { + if (_availNodes.size() <= 0) { + LOG.warn("No available nodes to schedule tasks on!"); + return null; + } + Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td); + Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>(); + LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors); + Collection<ExecutorDetails> scheduledTasks = new ArrayList<ExecutorDetails>(); + List<Component> spouts = this.getSpouts(_topologies, td); + + if (spouts.size() == 0) { + LOG.error("Cannot find a Spout!"); + return null; + } + + Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts); + + Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors); + Collection<ExecutorDetails> executorsNotScheduled = new HashSet<ExecutorDetails>(unassignedExecutors); + Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap); + //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. + //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1. + for (int i = 0; i < longestPriorityListSize; i++) { + for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) { + Iterator<ExecutorDetails> it = entry.getValue().iterator(); + if (it.hasNext()) { + ExecutorDetails exec = it.next(); + LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}", + new Object[] { exec, td.getExecutorToComponent().get(exec), + td.getTaskResourceReqList(exec), entry.getKey() }); + WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap); + if (targetSlot != null) { + RAS_Node targetNode = this.idToNode(targetSlot.getNodeId()); + if(!schedulerAssignmentMap.containsKey(targetSlot)) { + schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>()); + } + + schedulerAssignmentMap.get(targetSlot).add(exec); + targetNode.consumeResourcesforTask(exec, td); + scheduledTasks.add(exec); + LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec, + targetNode, targetNode.getAvailableMemoryResources(), + targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(), + targetNode.getTotalCpuResources(), targetSlot); + } else { + LOG.error("Not Enough Resources to schedule Task {}", exec); + } + it.remove(); + } + } + } + + executorsNotScheduled.removeAll(scheduledTasks); + LOG.debug("/* Scheduling left over task (most likely sys tasks) */"); + // schedule left over system tasks + for (ExecutorDetails exec : executorsNotScheduled) { + WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap); + if (targetSlot != null) { + RAS_Node targetNode = this.idToNode(targetSlot.getNodeId()); + if(schedulerAssignmentMap.containsKey(targetSlot) == false) { + schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>()); + } + + schedulerAssignmentMap.get(targetSlot).add(exec); + targetNode.consumeResourcesforTask(exec, td); + scheduledTasks.add(exec); + LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec, + targetNode, targetNode.getAvailableMemoryResources(), + targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(), + targetNode.getTotalCpuResources(), targetSlot); + } else { + LOG.error("Not Enough Resources to schedule Task {}", exec); + } + } + executorsNotScheduled.removeAll(scheduledTasks); + if (executorsNotScheduled.size() > 0) { + LOG.error("Not all executors successfully scheduled: {}", + executorsNotScheduled); + schedulerAssignmentMap = null; + } else { + LOG.debug("All resources successfully scheduled!"); + } + if (schedulerAssignmentMap == null) { + LOG.error("Topology {} not successfully scheduled!", td.getId()); + } + return schedulerAssignmentMap; + } + + private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { + WorkerSlot ws = null; + // first scheduling + if (this.refNode == null) { + String clus = this.getBestClustering(); + ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap); + } else { + ws = this.getBestWorker(exec, td, scheduleAssignmentMap); + } + if(ws != null) { + this.refNode = this.idToNode(ws.getNodeId()); + } + LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode); + return ws; + } + + private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { + return this.getBestWorker(exec, td, null, scheduleAssignmentMap); + } + + private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { + double taskMem = td.getTotalMemReqTask(exec); + double taskCPU = td.getTotalCpuReqTask(exec); + List<RAS_Node> nodes; + if(clusterId != null) { + nodes = this.getAvailableNodesFromCluster(clusterId); + + } else { + nodes = this.getAvailableNodes(); + } + TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double, RAS_Node>(); + for (RAS_Node n : nodes) { + if(n.getFreeSlots().size()>0) { + if (n.getAvailableMemoryResources() >= taskMem + && n.getAvailableCpuResources() >= taskCPU) { + double a = Math.pow((taskCPU - n.getAvailableCpuResources()) + * this.CPU_WEIGHT, 2); + double b = Math.pow((taskMem - n.getAvailableMemoryResources()) + * this.MEM_WEIGHT, 2); + double c = 0.0; + if(this.refNode != null) { + c = Math.pow(this.distToNode(this.refNode, n) + * this.NETWORK_WEIGHT, 2); + } + double distance = Math.sqrt(a + b + c); + nodeRankMap.put(distance, n); + } + } + } + + for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) { + RAS_Node n = entry.getValue(); + for(WorkerSlot ws : n.getFreeSlots()) { + if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) { + return ws; + } + } + } + return null; + } + + private String getBestClustering() { + String bestCluster = null; + Double mostRes = 0.0; + for (Entry<String, List<String>> cluster : _clusterInfo + .entrySet()) { + Double clusterTotalRes = this.getTotalClusterRes(cluster.getValue()); + if (clusterTotalRes > mostRes) { + mostRes = clusterTotalRes; + bestCluster = cluster.getKey(); + } + } + return bestCluster; + } + + private Double getTotalClusterRes(List<String> cluster) { + Double res = 0.0; + for (String node : cluster) { + res += _availNodes.get(this.NodeHostnameToId(node)) + .getAvailableMemoryResources() + + _availNodes.get(this.NodeHostnameToId(node)) + .getAvailableCpuResources(); + } + return res; + } + + private Double distToNode(RAS_Node src, RAS_Node dest) { + if (src.getId().equals(dest.getId())==true) { + return 1.0; + }else if (this.NodeToCluster(src) == this.NodeToCluster(dest)) { + return 2.0; + } else { + return 3.0; + } + } + + private String NodeToCluster(RAS_Node node) { + for (Entry<String, List<String>> entry : _clusterInfo + .entrySet()) { + if (entry.getValue().contains(node.getHostname())) { + return entry.getKey(); + } + } + LOG.error("Node: {} not found in any clusters", node.getHostname()); + return null; + } + + private List<RAS_Node> getAvailableNodes() { + LinkedList<RAS_Node> nodes = new LinkedList<RAS_Node>(); + for (String clusterId : _clusterInfo.keySet()) { + nodes.addAll(this.getAvailableNodesFromCluster(clusterId)); + } + return nodes; + } + + private List<RAS_Node> getAvailableNodesFromCluster(String clus) { + List<RAS_Node> retList = new ArrayList<RAS_Node>(); + for (String node_id : _clusterInfo.get(clus)) { + retList.add(_availNodes.get(this + .NodeHostnameToId(node_id))); + } + return retList; + } + + private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) { + List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId); + List<WorkerSlot> workers = new LinkedList<WorkerSlot>(); + for(RAS_Node node : nodes) { + workers.addAll(node.getFreeSlots()); + } + return workers; + } + + private List<WorkerSlot> getAvailableWorker() { + List<WorkerSlot> workers = new LinkedList<WorkerSlot>(); + for (String clusterId : _clusterInfo.keySet()) { + workers.addAll(this.getAvailableWorkersFromCluster(clusterId)); + } + return workers; + } + + /** + * In case in the future RAS can only use a subset of nodes + */ + private Map<String, RAS_Node> getAvailNodes() { + return _nodes; + } + + /** + * Breadth first traversal of the topology DAG + * @param topologies + * @param td + * @param spouts + * @return A partial ordering of components + */ + private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) { + // Since queue is a interface + Queue<Component> ordered__Component_list = new LinkedList<Component>(); + HashMap<String, Component> visited = new HashMap<String, Component>(); + + /* start from each spout that is not visited, each does a breadth-first traverse */ + for (Component spout : spouts) { + if (!visited.containsKey(spout.id)) { + Queue<Component> queue = new LinkedList<Component>(); + queue.offer(spout); + while (!queue.isEmpty()) { + Component comp = queue.poll(); + visited.put(comp.id, comp); + ordered__Component_list.add(comp); + List<String> neighbors = new ArrayList<String>(); + neighbors.addAll(comp.children); + neighbors.addAll(comp.parents); + for (String nbID : neighbors) { + if (!visited.containsKey(nbID)) { + Component child = topologies.getAllComponents().get(td.getId()).get(nbID); + queue.offer(child); + } + } + } + } + } + return ordered__Component_list; + } + + private List<Component> getSpouts(Topologies topologies, TopologyDetails td) { + List<Component> spouts = new ArrayList<Component>(); + for (Component c : topologies.getAllComponents().get(td.getId()) + .values()) { + if (c.type == Component.ComponentType.SPOUT) { + spouts.add(c); + } + } + return spouts; + } + + private Integer getLongestPriorityListSize(Map<Integer, List<ExecutorDetails>> priorityToExecutorMap) { + Integer mostNum = 0; + for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) { + Integer numExecs = execs.size(); + if (mostNum < numExecs) { + mostNum = numExecs; + } + } + return mostNum; + } + + /** + * Get the remaining amount memory that can be assigned to a worker given the set worker max heap size + * @param ws + * @param td + * @param scheduleAssignmentMap + * @return The remaining amount of memory + */ + private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { + Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td, scheduleAssignmentMap); + return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed; + } + + /** + * Get the amount of memory already assigned to a worker + * @param ws + * @param td + * @param scheduleAssignmentMap + * @return the amount of memory + */ + private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { + Double totalMem = 0.0; + Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws); + if(execs != null) { + for(ExecutorDetails exec : execs) { + totalMem += td.getTotalMemReqTask(exec); + } + } + return totalMem; + } + + /** + * Checks whether we can schedule an Executor exec on the worker slot ws + * @param exec + * @param ws + * @param td + * @param scheduleAssignmentMap + * @return a boolean: True denoting the exec can be scheduled on ws and false if it cannot + */ + private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) { --- End diff -- Since we now only consider the memory when checkWorkerConstraints, may add some comments saying that we will also check CPU constraint in the near future.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---