[ 
https://issues.apache.org/jira/browse/STORM-893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14946163#comment-14946163
 ] 

ASF GitHub Bot commented on STORM-893:
--------------------------------------

Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r41347349
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
 ---
    @@ -0,0 +1,478 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package backtype.storm.scheduler.resource.strategies;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Queue;
    +import java.util.TreeMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import backtype.storm.scheduler.Cluster;
    +import backtype.storm.scheduler.ExecutorDetails;
    +import backtype.storm.scheduler.Topologies;
    +import backtype.storm.scheduler.TopologyDetails;
    +import backtype.storm.scheduler.WorkerSlot;
    +import backtype.storm.scheduler.resource.Component;
    +import backtype.storm.scheduler.resource.RAS_Node;
    +
    +public class ResourceAwareStrategy implements IStrategy {
    +    private Logger LOG = null;
    +    private Topologies _topologies;
    +    private Cluster _cluster;
    +    //Map key is the supervisor id and the value is the corresponding 
RAS_Node Object 
    +    private Map<String, RAS_Node> _availNodes;
    +    private RAS_Node refNode = null;
    +    /**
    +     * supervisor id -> Node
    +     */
    +    private Map<String, RAS_Node> _nodes;
    +    private Map<String, List<String>> _clusterInfo;
    +
    +    private final double CPU_WEIGHT = 1.0;
    +    private final double MEM_WEIGHT = 1.0;
    +    private final double NETWORK_WEIGHT = 1.0;
    +
    +    public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
    +        _topologies = topologies;
    +        _cluster = cluster;
    +        _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
    +        _availNodes = this.getAvailNodes();
    +        this.LOG = LoggerFactory.getLogger(this.getClass());
    +        _clusterInfo = cluster.getNetworkTopography();
    +        LOG.debug(this.getClusterInfo());
    +    }
    +
    +    //the returned TreeMap keeps the Components sorted
    +    private TreeMap<Integer, List<ExecutorDetails>> 
getPriorityToExecutorDetailsListMap(
    +            Queue<Component> ordered__Component_list, 
Collection<ExecutorDetails> unassignedExecutors) {
    +        TreeMap<Integer, List<ExecutorDetails>> retMap = new 
TreeMap<Integer, List<ExecutorDetails>>();
    +        Integer rank = 0;
    +        for (Component ras_comp : ordered__Component_list) {
    +            retMap.put(rank, new ArrayList<ExecutorDetails>());
    +            for(ExecutorDetails exec : ras_comp.execs) {
    +                if(unassignedExecutors.contains(exec)) {
    +                    retMap.get(rank).add(exec);
    +                }
    +            }
    +            rank++;
    +        }
    +        return retMap;
    +    }
    +
    +    public Map<WorkerSlot, Collection<ExecutorDetails>> 
schedule(TopologyDetails td) {
    +        if (_availNodes.size() <= 0) {
    +            LOG.warn("No available nodes to schedule tasks on!");
    +            return null;
    +        }
    +        Collection<ExecutorDetails> unassignedExecutors = 
_cluster.getUnassignedExecutors(td);
    +        Map<WorkerSlot, Collection<ExecutorDetails>> 
schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
    +        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
    +        Collection<ExecutorDetails> scheduledTasks = new 
ArrayList<ExecutorDetails>();
    +        List<Component> spouts = this.getSpouts(_topologies, td);
    +
    +        if (spouts.size() == 0) {
    +            LOG.error("Cannot find a Spout!");
    +            return null;
    +        }
    +
    +        Queue<Component> ordered__Component_list = bfs(_topologies, td, 
spouts);
    +
    +        Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = 
getPriorityToExecutorDetailsListMap(ordered__Component_list, 
unassignedExecutors);
    +        Collection<ExecutorDetails> executorsNotScheduled = new 
HashSet<ExecutorDetails>(unassignedExecutors);
    +        Integer longestPriorityListSize = 
this.getLongestPriorityListSize(priorityToExecutorMap);
    +        //Pick the first executor with priority one, then the 1st exec 
with priority 2, so on an so forth. 
    +        //Once we reach the last priority, we go back to priority 1 and 
schedule the second task with priority 1.
    +        for (int i = 0; i < longestPriorityListSize; i++) {
    +            for (Entry<Integer, List<ExecutorDetails>> entry : 
priorityToExecutorMap.entrySet()) {
    +                Iterator<ExecutorDetails> it = entry.getValue().iterator();
    +                if (it.hasNext()) {
    +                    ExecutorDetails exec = it.next();
    +                    LOG.debug("\n\nAttempting to schedule: {} of component 
{}[avail {}] with rank {}",
    +                            new Object[] { exec, 
td.getExecutorToComponent().get(exec),
    +                    td.getTaskResourceReqList(exec), entry.getKey() });
    +                    WorkerSlot targetSlot = this.findWorkerForExec(exec, 
td, schedulerAssignmentMap);
    +                    if (targetSlot != null) {
    +                        RAS_Node targetNode = 
this.idToNode(targetSlot.getNodeId());
    +                        
if(!schedulerAssignmentMap.containsKey(targetSlot)) {
    +                            schedulerAssignmentMap.put(targetSlot, new 
LinkedList<ExecutorDetails>());
    +                        }
    +                       
    +                        schedulerAssignmentMap.get(targetSlot).add(exec);
    +                        targetNode.consumeResourcesforTask(exec, td);
    +                        scheduledTasks.add(exec);
    +                        LOG.debug("TASK {} assigned to Node: {} avail 
[mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                                targetNode, 
targetNode.getAvailableMemoryResources(),
    +                                targetNode.getAvailableCpuResources(), 
targetNode.getTotalMemoryResources(),
    +                                targetNode.getTotalCpuResources(), 
targetSlot);
    +                    } else {
    +                        LOG.error("Not Enough Resources to schedule Task 
{}", exec);
    +                    }
    +                    it.remove();
    +                }
    +            }
    +        }
    +
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        LOG.debug("/* Scheduling left over task (most likely sys tasks) 
*/");
    +        // schedule left over system tasks
    +        for (ExecutorDetails exec : executorsNotScheduled) {
    +            WorkerSlot targetSlot = this.findWorkerForExec(exec, td, 
schedulerAssignmentMap);
    +            if (targetSlot != null) {
    +                RAS_Node targetNode = 
this.idToNode(targetSlot.getNodeId());
    +                if(schedulerAssignmentMap.containsKey(targetSlot) == 
false) {
    +                    schedulerAssignmentMap.put(targetSlot, new 
LinkedList<ExecutorDetails>());
    +                }
    +               
    +                schedulerAssignmentMap.get(targetSlot).add(exec);
    +                targetNode.consumeResourcesforTask(exec, td);
    +                scheduledTasks.add(exec);
    +                LOG.debug("TASK {} assigned to Node: {} avail [mem: {} 
cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
    +                        targetNode, 
targetNode.getAvailableMemoryResources(),
    +                        targetNode.getAvailableCpuResources(), 
targetNode.getTotalMemoryResources(),
    +                        targetNode.getTotalCpuResources(), targetSlot);
    +            } else {
    +                LOG.error("Not Enough Resources to schedule Task {}", 
exec);
    +            }
    +        }
    +        executorsNotScheduled.removeAll(scheduledTasks);
    +        if (executorsNotScheduled.size() > 0) {
    +            LOG.error("Not all executors successfully scheduled: {}",
    +                    executorsNotScheduled);
    +            schedulerAssignmentMap = null;
    +        } else {
    +            LOG.debug("All resources successfully scheduled!");
    +        }
    +        if (schedulerAssignmentMap == null) {
    +            LOG.error("Topology {} not successfully scheduled!", 
td.getId());
    +        }
    +        return schedulerAssignmentMap;
    +    }
    +
    +    private WorkerSlot findWorkerForExec(ExecutorDetails exec, 
TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> 
scheduleAssignmentMap) {
    +      WorkerSlot ws = null;
    +      // first scheduling
    +      if (this.refNode == null) {
    +          String clus = this.getBestClustering();
    +          ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
    +      } else {
    +          ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
    +      }
    +      if(ws != null) {
    +          this.refNode = this.idToNode(ws.getNodeId());
    +      }
    +      LOG.debug("reference node for the resource aware scheduler is: {}", 
this.refNode);
    +      return ws;
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails 
td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
    +        return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
    +    }
    +
    +    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails 
td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> 
scheduleAssignmentMap) {
    +        double taskMem = td.getTotalMemReqTask(exec);
    +        double taskCPU = td.getTotalCpuReqTask(exec);
    +        List<RAS_Node> nodes;
    +        if(clusterId != null) {
    +            nodes = this.getAvailableNodesFromCluster(clusterId);
    +            
    +        } else {
    +            nodes = this.getAvailableNodes();
    +        }
    +        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double, 
RAS_Node>();
    --- End diff --
    
    will add comments


> Resource Aware Scheduling
> -------------------------
>
>                 Key: STORM-893
>                 URL: https://issues.apache.org/jira/browse/STORM-893
>             Project: Apache Storm
>          Issue Type: Umbrella
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>         Attachments: resource_aware_scheduler_api.pdf
>
>
> At Yahoo we have been working on resource aware scheduling in storm, based 
> off of some work done in academia.  This rollup ticket is to track the 
> complete project.  With several sub tasks.  Some that are already done and 
> need to be pushed back, and others that we have not started on yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to