[ 
https://issues.apache.org/jira/browse/STORM-893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14938836#comment-14938836
 ] 

ASF GitHub Bot commented on STORM-893:
--------------------------------------

Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/746#discussion_r40849301
  
    --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
    @@ -79,11 +95,302 @@ public StormTopology getTopology() {
                     ret.put(executor, compId);
                 }
             }
    -        
    +
             return ret;
         }
    -    
    +
         public Collection<ExecutorDetails> getExecutors() {
             return this.executorToComponent.keySet();
         }
    +
    +    private void initResourceList() {
    +        _resourceList = new HashMap<ExecutorDetails, Map<String, 
Double>>();
    +        // Extract bolt memory info
    +        if (this.topology.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> bolt : 
this.topology.get_bolts().entrySet()) {
    +                //the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
    +                Map<String, Double> topology_resources = 
ResourceUtils.parseResources(bolt
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> 
anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
    +                    }
    +                }
    +            }
    +        }
    +        // Extract spout memory info
    +        if (this.topology.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spout : 
this.topology.get_spouts().entrySet()) {
    +                Map<String, Double> topology_resources = 
ResourceUtils.parseResources(spout
    +                        .getValue().get_common().get_json_conf());
    +                ResourceUtils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
    +                for (Map.Entry<ExecutorDetails, String> 
anExecutorToComponent : executorToComponent.entrySet()) {
    +                    if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
    +                        _resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
    +                    }
    +                }
    +            }
    +        } else {
    +            LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
    +        }
    +        //schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
    +        for(ExecutorDetails exec : this.getExecutors()) {
    +            if (_resourceList.containsKey(exec) == false) {
    +                LOG.debug(
    +                        "Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
    +                        this.getExecutorToComponent().get(exec),
    +                        exec,
    +                        
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
    +                        
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
    +                        
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
    +                this.addDefaultResforExec(exec);
    +            } 
    +        }
    +    }
    +
    +    private List<ExecutorDetails> componentToExecs(String comp) {
    +        List<ExecutorDetails> execs = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, String> entry : 
executorToComponent.entrySet()) {
    +            if (entry.getValue().equals(comp)) {
    +                execs.add(entry.getKey());
    +            }
    +        }
    +        return execs;
    +    }
    +
    +    /**
    +     * Returns a representation of the non-system components of the 
topology graph
    +     * Each Component object in the returning map is populated with the 
list of its
    +     * parents, children and execs assigned to that component.
    +     * @return a map of components
    +     */
    +    public Map<String, Component> getComponents() {
    +        Map<String, Component> all_comp = new HashMap<String, Component>();
    +
    +        StormTopology storm_topo = this.topology;
    +        // spouts
    +        if (storm_topo.get_spouts() != null) {
    +            for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
    +                    .get_spouts().entrySet()) {
    +                if (!Utils.isSystemId(spoutEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(spoutEntry.getKey())) {
    +                        newComp = all_comp.get(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(spoutEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(spoutEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.SPOUT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> spoutInput : 
spoutEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(spoutInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(spoutInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(spoutInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(spoutInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(spoutInput.getKey()
    +                                .get_componentId()).children.add(spoutEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        // bolts
    +        if (storm_topo.get_bolts() != null) {
    +            for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
    +                    .entrySet()) {
    +                if (!Utils.isSystemId(boltEntry.getKey())) {
    +                    Component newComp = null;
    +                    if (all_comp.containsKey(boltEntry.getKey())) {
    +                        newComp = all_comp.get(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                    } else {
    +                        newComp = new Component(boltEntry.getKey());
    +                        newComp.execs = componentToExecs(newComp.id);
    +                        all_comp.put(boltEntry.getKey(), newComp);
    +                    }
    +                    newComp.type = Component.ComponentType.BOLT;
    +
    +                    for (Map.Entry<GlobalStreamId, Grouping> boltInput : 
boltEntry
    +                            .getValue().get_common().get_inputs()
    +                            .entrySet()) {
    +                        newComp.parents.add(boltInput.getKey()
    +                                .get_componentId());
    +                        if (!all_comp.containsKey(boltInput
    +                                .getKey().get_componentId())) {
    +                            all_comp.put(boltInput.getKey()
    +                                            .get_componentId(),
    +                                    new Component(boltInput.getKey()
    +                                            .get_componentId()));
    +                        }
    +                        all_comp.get(boltInput.getKey()
    +                                .get_componentId()).children.add(boltEntry
    +                                .getKey());
    +                    }
    +                }
    +            }
    +        }
    +        return all_comp;
    +    }
    +
    +    /**
    +     * Gets the on heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of on heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    
.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the off heap memory requirement for a
    +     * certain task within a topology
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the amount of off heap memory
    +     * requirement for this exec in topology topoId.
    +     */
    +    public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
    +        Double ret = null;
    +        if (hasExecInTopo(exec)) {
    +            ret = _resourceList
    +                    .get(exec)
    +                    
.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Gets the total memory requirement for a task
    +     * @param exec the executor the inquiry is concerning.
    +     * @return Double the total memory requirement
    +     *  for this exec in topology topoId.
    +     */
    +    public Double getTotalMemReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return getOffHeapMemoryRequirement(exec)
    +                    + getOnHeapMemoryRequirement(exec);
    +        }
    +        LOG.info("cannot find {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * Gets the total memory resource list for a
    +     * set of tasks that is part of a topology.
    +     * @return Map<ExecutorDetails, Double> a map of the total memory 
requirement
    +     *  for all tasks in topology topoId.
    +     */
    +    public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
    +        Map<ExecutorDetails, Double> ret = new HashMap<ExecutorDetails, 
Double>();
    +        for (ExecutorDetails exec : _resourceList.keySet()) {
    +            ret.put(exec, getTotalMemReqTask(exec));
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Get the total CPU requirement for executor
    +     * @param exec
    +     * @return Double the total about of cpu requirement for executor
    +     */
    +    public Double getTotalCpuReqTask(ExecutorDetails exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList
    +                    .get(exec)
    +                    .get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    +        return null;
    +    }
    +
    +    /**
    +     * get the resources requirements for a executor
    +     * @param exec
    +     * @return a map containing the resource requirements for this exec
    +     */
    +    public Map<String, Double> getTaskResourceReqList(ExecutorDetails 
exec) {
    +        if (hasExecInTopo(exec)) {
    +            return _resourceList.get(exec);
    +        }
    +        LOG.warn("cannot find - {}", exec);
    --- End diff --
    
    will remove


> Resource Aware Scheduling
> -------------------------
>
>                 Key: STORM-893
>                 URL: https://issues.apache.org/jira/browse/STORM-893
>             Project: Apache Storm
>          Issue Type: Umbrella
>            Reporter: Robert Joseph Evans
>            Assignee: Boyang Jerry Peng
>         Attachments: resource_aware_scheduler_api.pdf
>
>
> At Yahoo we have been working on resource aware scheduling in storm, based 
> off of some work done in academia.  This rollup ticket is to track the 
> complete project.  With several sub tasks.  Some that are already done and 
> need to be pushed back, and others that we have not started on yet.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to