[ 
https://issues.apache.org/jira/browse/STORM-893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14935904#comment-14935904
 ] 

ASF GitHub Bot commented on STORM-893:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40732622
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
    @@ -79,11 +96,304 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, 
Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : 
this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> 
anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have 
any bolts!");
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : 
this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> 
anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.info(
    +                        "Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : 
executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the 
topology graph
    +     * Each Component object in the returning map is populated with the 
list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, RAS_Component> getRAS_Components() {
    --- End diff --
    
    I'm not sure that RAS_Component is the right name for this class or this 
method.  The class has nothing to do with resources at all.  Perhaps we can 
call it a Component and the method getComponentGraph instead?


> Resource Aware Scheduling
> -------------------------
>
>                 Key: STORM-893
>                 URL: https://issues.apache.org/jira/browse/STORM-893
>             Project: Apache Storm
>          Issue Type: Umbrella
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>         Attachments: resource_aware_scheduler_api.pdf
>
>
> At Yahoo we have been working on resource aware scheduling in storm, based 
> off of some work done in academia.  This rollup ticket is to track the 
> complete project.  With several sub tasks.  Some that are already done and 
> need to be pushed back, and others that we have not started on yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to