[ 
https://issues.apache.org/jira/browse/STORM-893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14946164#comment-14946164
 ] 

ASF GitHub Bot commented on STORM-893:
--------------------------------------

Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41347393
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
 ---
    @@ -0,0 +1,478 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Queue;
    +import java.util.TreeMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.Component;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +public class ResourceAwareStrategy implements IStrategy {
    +    private Logger LOG = null;
    +    private Topologies _topologies;
    +    private Cluster _cluster;
    +    //Map key is the supervisor id and the value is the corresponding 
RAS_Node Object 
    +    private Map<String, RAS_Node> _availNodes;
    +    private RAS_Node refNode = null;
    +    /**
    +     * supervisor id -> Node
    +     */
    +    private Map<String, RAS_Node> _nodes;
    +    private Map<String, List<String>> _clusterInfo;
    +
    +    private final double CPU_WEIGHT = 1.0;
    +    private final double MEM_WEIGHT = 1.0;
    +    private final double NETWORK_WEIGHT = 1.0;
    +
    +    public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
    +        _topologies = topologies;
    +        _cluster = cluster;
    +        _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
    +        _availNodes = this.getAvailNodes();
    +        this.LOG = LoggerFactory.getLogger(this.getClass());
    +        _clusterInfo = cluster.getNetworkTopography();
    +        LOG.debug(this.getClusterInfo());
    +    }
    +
    +    //the returned TreeMap keeps the Components sorted
    +    private TreeMap<Integer, List<ExecutorDetails>> 
getPriorityToExecutorDetailsListMap(
    +            Queue<Component> ordered__Component_list, 
Collection<ExecutorDetails> unassignedExecutors) {
    +        TreeMap<Integer, List<ExecutorDetails>> retMap = new 
TreeMap<Integer, List<ExecutorDetails>>();
    +        Integer rank = 0;
    +        for (Component ras_comp : ordered__Component_list) {
    +            retMap.put(rank, new ArrayList<ExecutorDetails>());
    +            for(ExecutorDetails exec : ras_comp.execs) {
    +                if(unassignedExecutors.contains(exec)) {
    +                    retMap.get(rank).add(exec);
    +                }
    +            }
    +            rank++;
    +        }
    +        return retMap;
    +    }
    +
    +    public Map<WorkerSlot, Collection<ExecutorDetails>> 
schedule(TopologyDetails td) {
    +        if (_availNodes.size() <= 0) {
    +            LOG.warn("No available nodes to schedule tasks on!");
    +            return null;
    +        }
    +        Collection<ExecutorDetails> unassignedExecutors = 
_cluster.getUnassignedExecutors(td);
    +        Map<WorkerSlot, Collection<ExecutorDetails>> 
schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
    +        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
    +        Collection<ExecutorDetails> scheduledTasks = new 
ArrayList<ExecutorDetails>();
    +        List<Component> spouts = this.getSpouts(_topologies, td);
    +
    +        if (spouts.size() == 0) {
    +            LOG.error("Cannot find a Spout!");
    +            return null;
    +        }
    +
    +        Queue<Component> ordered__Component_list = bfs(_topologies, td, 
spouts);
    +
    +        Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = 
getPriorityToExecutorDetailsListMap(ordered__Component_list, 
unassignedExecutors);
    +        Collection<ExecutorDetails> executorsNotScheduled = new 
HashSet<ExecutorDetails>(unassignedExecutors);
    +        Integer longestPriorityListSize = 
this.getLongestPriorityListSize(priorityToExecutorMap);
    +        //Pick the first executor with priority one, then the 1st exec 
with priority 2, so on an so forth. 
    +        //Once we reach the last priority, we go back to priority 1 and 
schedule the second task with priority 1.
    +        for (int i = 0; i < longestPriorityListSize; i++) {
    +            for (Entry<Integer, List<ExecutorDetails>> entry : 
priorityToExecutorMap.entrySet()) {
    +                Iterator<ExecutorDetails> it = entry.getValue().iterator();
    +                if (it.hasNext()) {
    +                    ExecutorDetails exec = it.next();
    +                    LOG.debug("\n\nAttempting to schedule: {} of component 
{}[avail {}] with rank {}",
    +                            new Object[] { exec, 
td.getExecutorToComponent().get(exec),
    +                    td.getTaskResourceReqList(exec), entry.getKey() });
    +                    WorkerSlot targetSlot = this.findWorkerForExec(exec, 
td, schedulerAssignmentMap);
    +                    if (targetSlot != null) {
    +                        RAS_Node targetNode = 
this.idToNode(targetSlot.getNodeId());
    +                        
if(!schedulerAssignmentMap.containsKey(targetSlot)) {
    +                            schedulerAssignmentMap.put(targetSlot, new 
LinkedList<ExecutorDetails>());
    +                        }
    +                       
    +                        schedulerAssignmentMap.get(targetSlot).add(exec);
    +                        targetNode.consumeResourcesforTask(exec, td);
    +                        scheduledTasks.add(exec);
    +                        LOG.debug("TASK {} assigned to Node: {} avail 
[mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                                targetNode, 
targetNode.getAvailableMemoryResources(),
    +                                targetNode.getAvailableCpuResources(), 
targetNode.getTotalMemoryResources(),
    +                                targetNode.getTotalCpuResources(), 
targetSlot);
    +                    } else {
    +                        LOG.error("Not Enough Resources to schedule Task 
{}", exec);
    +                    }
    +                    it.remove();
    +                }
    +            }
    +        }
    +
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        LOG.debug("/* Scheduling left over task (most likely sys tasks) 
*/");
    +        // schedule left over system tasks
    +        for (ExecutorDetails exec : executorsNotScheduled) {
    +            WorkerSlot targetSlot = this.findWorkerForExec(exec, td, 
schedulerAssignmentMap);
    +            if (targetSlot != null) {
    +                RAS_Node targetNode = 
this.idToNode(targetSlot.getNodeId());
    +                if(schedulerAssignmentMap.containsKey(targetSlot) == 
false) {
    +                    schedulerAssignmentMap.put(targetSlot, new 
LinkedList<ExecutorDetails>());
    +                }
    +               
    +                schedulerAssignmentMap.get(targetSlot).add(exec);
    +                targetNode.consumeResourcesforTask(exec, td);
    +                scheduledTasks.add(exec);
    +                LOG.debug("TASK {} assigned to Node: {} avail [mem: {} 
cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                        targetNode, 
targetNode.getAvailableMemoryResources(),
    +                        targetNode.getAvailableCpuResources(), 
targetNode.getTotalMemoryResources(),
    +                        targetNode.getTotalCpuResources(), targetSlot);
    +            } else {
    +                LOG.error("Not Enough Resources to schedule Task {}", 
exec);
    +            }
    +        }
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        if (executorsNotScheduled.size() > 0) {
    +            LOG.error("Not all executors successfully scheduled: {}",
    +                    executorsNotScheduled);
    +            schedulerAssignmentMap = null;
    +        } else {
    +            LOG.debug("All resources successfully scheduled!");
    +        }
    +        if (schedulerAssignmentMap == null) {
    +            LOG.error("Topology {} not successfully scheduled!", 
td.getId());
    +        }
    +        return schedulerAssignmentMap;
    +    }
    +
    +    private WorkerSlot findWorkerForExec(ExecutorDetails exec, 
TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> 
scheduleAssignmentMap) {
    +      WorkerSlot ws = null;
    +      // first scheduling
    +      if (this.refNode == null) {
    +          String clus = this.getBestClustering();
    +          ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
    +      } else {
    +          ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
    +      }
    +      if(ws != null) {
    +          this.refNode = this.idToNode(ws.getNodeId());
    +      }
    +      LOG.debug("reference node for the resource aware scheduler is: {}", 
this.refNode);
    +      return ws;
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails 
td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails 
td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> 
scheduleAssignmentMap) {
    +        double taskMem = td.getTotalMemReqTask(exec);
    +        double taskCPU = td.getTotalCpuReqTask(exec);
    +        List<RAS_Node> nodes;
    +        if(clusterId != null) {
    +            nodes = this.getAvailableNodesFromCluster(clusterId);
    +            
    +        } else {
    +            nodes = this.getAvailableNodes();
    +        }
    +        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double, 
RAS_Node>();
    +        for (RAS_Node n : nodes) {
    +            if(n.getFreeSlots().size()>0) {
    +                if (n.getAvailableMemoryResources() >= taskMem
    +                      && n.getAvailableCpuResources() >= taskCPU) {
    +                  double a = Math.pow((taskCPU - 
n.getAvailableCpuResources())
    +                          * this.CPU_WEIGHT, 2);
    +                  double b = Math.pow((taskMem - 
n.getAvailableMemoryResources())
    +                          * this.MEM_WEIGHT, 2);
    +                  double c = 0.0;
    +                  if(this.refNode != null) {
    +                      c = Math.pow(this.distToNode(this.refNode, n)
    +                              * this.NETWORK_WEIGHT, 2);
    +                  }
    +                  double distance = Math.sqrt(a + b + c);
    +                  nodeRankMap.put(distance, n);
    +                }
    +            }
    +        }
    +        
    +        for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
    +            RAS_Node n = entry.getValue();
    +            for(WorkerSlot ws : n.getFreeSlots()) {
    +                if(checkWorkerConstraints(exec, ws, td, 
scheduleAssignmentMap)) {
    +                    return ws;
    +                }
    +            }
    +        }
    +        return null;
    +    }
    +
    +    private String getBestClustering() {
    +        String bestCluster = null;
    +        Double mostRes = 0.0;
    +        for (Entry<String, List<String>> cluster : _clusterInfo
    +                .entrySet()) {
    +            Double clusterTotalRes = 
this.getTotalClusterRes(cluster.getValue());
    +            if (clusterTotalRes > mostRes) {
    +                mostRes = clusterTotalRes;
    +                bestCluster = cluster.getKey();
    +            }
    +        }
    +        return bestCluster;
    +    }
    +
    +    private Double getTotalClusterRes(List<String> cluster) {
    +        Double res = 0.0;
    +        for (String node : cluster) {
    +            res += _availNodes.get(this.NodeHostnameToId(node))
    +                    .getAvailableMemoryResources()
    +                    + _availNodes.get(this.NodeHostnameToId(node))
    +                    .getAvailableCpuResources();
    +        }
    +        return res;
    +    }
    +
    +    private Double distToNode(RAS_Node src, RAS_Node dest) {
    +        if (src.getId().equals(dest.getId())==true) {
    +            return 1.0;
    +        }else if (this.NodeToCluster(src) == this.NodeToCluster(dest)) {
    +            return 2.0;
    +        } else {
    +            return 3.0;
    +        }
    +    }
    +
    +    private String NodeToCluster(RAS_Node node) {
    +        for (Entry<String, List<String>> entry : _clusterInfo
    +                .entrySet()) {
    +            if (entry.getValue().contains(node.getHostname())) {
    +                return entry.getKey();
    +            }
    +        }
    +        LOG.error("Node: {} not found in any clusters", 
node.getHostname());
    +        return null;
    +    }
    +    
    +    private List<RAS_Node> getAvailableNodes() {
    +        LinkedList<RAS_Node> nodes = new LinkedList<RAS_Node>();
    +        for (String clusterId : _clusterInfo.keySet()) {
    +            nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
    +        }
    +        return nodes;
    +    }
    +
    +    private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
    +        List<RAS_Node> retList = new ArrayList<RAS_Node>();
    +        for (String node_id : _clusterInfo.get(clus)) {
    +            retList.add(_availNodes.get(this
    +                    .NodeHostnameToId(node_id)));
    +        }
    +        return retList;
    +    }
    +
    +    private List<WorkerSlot> getAvailableWorkersFromCluster(String 
clusterId) {
    +        List<RAS_Node> nodes = 
this.getAvailableNodesFromCluster(clusterId);
    +        List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
    +        for(RAS_Node node : nodes) {
    +            workers.addAll(node.getFreeSlots());
    +        }
    +        return workers;
    +    }
    +
    +    private List<WorkerSlot> getAvailableWorker() {
    +        List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
    +        for (String clusterId : _clusterInfo.keySet()) {
    +            workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
    +        }
    +        return workers;
    +    }
    +
    +    /**
    +     * In case in the future RAS can only use a subset of nodes
    +     */
    +    private Map<String, RAS_Node> getAvailNodes() {
    +        return _nodes;
    +    }
    +
    +    /**
    +     * Breadth first traversal of the topology DAG
    +     * @param topologies
    +     * @param td
    +     * @param spouts
    +     * @return A partial ordering of components
    +     */
    +    private Queue<Component> bfs(Topologies topologies, TopologyDetails 
td, List<Component> spouts) {
    +        // Since queue is a interface
    +        Queue<Component> ordered__Component_list = new 
LinkedList<Component>();
    +        HashMap<String, Component> visited = new HashMap<String, 
Component>();
    +
    +        /* start from each spout that is not visited, each does a 
breadth-first traverse */
    +        for (Component spout : spouts) {
    +            if (!visited.containsKey(spout.id)) {
    +                Queue<Component> queue = new LinkedList<Component>();
    +                queue.offer(spout);
    +                while (!queue.isEmpty()) {
    +                    Component comp = queue.poll();
    +                    visited.put(comp.id, comp);
    +                    ordered__Component_list.add(comp);
    +                    List<String> neighbors = new ArrayList<String>();
    +                    neighbors.addAll(comp.children);
    +                    neighbors.addAll(comp.parents);
    +                    for (String nbID : neighbors) {
    +                        if (!visited.containsKey(nbID)) {
    +                            Component child = 
topologies.getAllComponents().get(td.getId()).get(nbID);
    +                            queue.offer(child);
    +                        }
    +                    }
    +                }
    +            }
    +        }
    +        return ordered__Component_list;
    +    }
    +
    +    private List<Component> getSpouts(Topologies topologies, 
TopologyDetails td) {
    +        List<Component> spouts = new ArrayList<Component>();
    +        for (Component c : topologies.getAllComponents().get(td.getId())
    +                .values()) {
    +            if (c.type == Component.ComponentType.SPOUT) {
    +                spouts.add(c);
    +            }
    +        }
    +        return spouts;
    +    }
    +
    +    private Integer getLongestPriorityListSize(Map<Integer, 
List<ExecutorDetails>> priorityToExecutorMap) {
    +        Integer mostNum = 0;
    +        for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) 
{
    +            Integer numExecs = execs.size();
    +            if (mostNum < numExecs) {
    +                mostNum = numExecs;
    +            }
    +        }
    +        return mostNum;
    +    }
    +
    +    /**
    +     * Get the remaining amount memory that can be assigned to a worker 
given the set worker max heap size
    +     * @param ws
    +     * @param td
    +     * @param scheduleAssignmentMap
    +     * @return The remaining amount of memory
    +     */
    +    private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, 
TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> 
scheduleAssignmentMap) {
    +        Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td, 
scheduleAssignmentMap);
    +        return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed;
    +    }
    +
    +    /**
    +     * Get the amount of memory already assigned to a worker
    +     * @param ws
    +     * @param td
    +     * @param scheduleAssignmentMap
    +     * @return the amount of memory
    +     */
    +    private Double getWorkerScheduledMemoryUse(WorkerSlot ws, 
TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> 
scheduleAssignmentMap) {
    +        Double totalMem = 0.0;
    +        Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
    +        if(execs != null) {
    +            for(ExecutorDetails exec : execs) {
    +                totalMem += td.getTotalMemReqTask(exec);
    +            }
    +        } 
    +        return totalMem;
    +    }
    +
    +    /**
    +     * Checks whether we can schedule an Executor exec on the worker slot 
ws
    +     * @param exec
    +     * @param ws
    +     * @param td
    +     * @param scheduleAssignmentMap
    +     * @return a boolean: True denoting the exec can be scheduled on ws 
and false if it cannot
    +     */
    +    private boolean checkWorkerConstraints(ExecutorDetails exec, 
WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> 
scheduleAssignmentMap) {
    --- End diff --
    
    will add comment


> Resource Aware Scheduling
> -------------------------
>
>                 Key: STORM-893
>                 URL: https://issues.apache.org/jira/browse/STORM-893
>             Project: Apache Storm
>          Issue Type: Umbrella
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>         Attachments: resource_aware_scheduler_api.pdf
>
>
> At Yahoo we have been working on resource aware scheduling in storm, based 
> off of some work done in academia.  This rollup ticket is to track the 
> complete project.  With several sub tasks.  Some that are already done and 
> need to be pushed back, and others that we have not started on yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to