[
https://issues.apache.org/jira/browse/STORM-893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14938304#comment-14938304
]
ASF GitHub Bot commented on STORM-893:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/746#discussion_r40838080
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
---
@@ -79,11 +95,302 @@ public StormTopology getTopology() {
ret.put(executor, compId);
}
}
-
+
return ret;
}
-
+
public Collection<ExecutorDetails> getExecutors() {
return this.executorToComponent.keySet();
}
+
+ private void initResourceList() {
+ _resourceList = new HashMap<ExecutorDetails, Map<String,
Double>>();
+ // Extract bolt memory info
+ if (this.topology.get_bolts() != null) {
+ for (Map.Entry<String, Bolt> bolt :
this.topology.get_bolts().entrySet()) {
+ //the json_conf is populated by TopologyBuilder (e.g.
boltDeclarer.setMemoryLoad)
+ Map<String, Double> topology_resources =
ResourceUtils.parseResources(bolt
+ .getValue().get_common().get_json_conf());
+ ResourceUtils.checkIntialization(topology_resources,
bolt.getValue().toString(), this.topologyConf);
+ for (Map.Entry<ExecutorDetails, String>
anExecutorToComponent : executorToComponent.entrySet()) {
+ if
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+ _resourceList.put(anExecutorToComponent.getKey(),
topology_resources);
+ }
+ }
+ }
+ }
+ // Extract spout memory info
+ if (this.topology.get_spouts() != null) {
+ for (Map.Entry<String, SpoutSpec> spout :
this.topology.get_spouts().entrySet()) {
+ Map<String, Double> topology_resources =
ResourceUtils.parseResources(spout
+ .getValue().get_common().get_json_conf());
+ ResourceUtils.checkIntialization(topology_resources,
spout.getValue().toString(), this.topologyConf);
+ for (Map.Entry<ExecutorDetails, String>
anExecutorToComponent : executorToComponent.entrySet()) {
+ if
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+ _resourceList.put(anExecutorToComponent.getKey(),
topology_resources);
+ }
+ }
+ }
+ } else {
+ LOG.warn("Topology " + topologyId + " does not seem to have
any spouts!");
+ }
+ //schedule tasks that are not part of components returned from
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker
tasks)
+ for(ExecutorDetails exec : this.getExecutors()) {
+ if (_resourceList.containsKey(exec) == false) {
+ LOG.debug(
+ "Scheduling {} {} with memory requirement as 'on
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+ this.getExecutorToComponent().get(exec),
+ exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+ this.addDefaultResforExec(exec);
+ }
+ }
+ }
+
+ private List<ExecutorDetails> componentToExecs(String comp) {
+ List<ExecutorDetails> execs = new ArrayList<>();
+ for (Map.Entry<ExecutorDetails, String> entry :
executorToComponent.entrySet()) {
+ if (entry.getValue().equals(comp)) {
+ execs.add(entry.getKey());
+ }
+ }
+ return execs;
+ }
+
+ /**
+ * Returns a representation of the non-system components of the
topology graph
+ * Each Component object in the returning map is populated with the
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+ public Map<String, Component> getComponents() {
+ Map<String, Component> all_comp = new HashMap<String, Component>();
+
+ StormTopology storm_topo = this.topology;
+ // spouts
+ if (storm_topo.get_spouts() != null) {
+ for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
+ .get_spouts().entrySet()) {
+ if (!Utils.isSystemId(spoutEntry.getKey())) {
+ Component newComp = null;
+ if (all_comp.containsKey(spoutEntry.getKey())) {
+ newComp = all_comp.get(spoutEntry.getKey());
+ newComp.execs = componentToExecs(newComp.id);
+ } else {
+ newComp = new Component(spoutEntry.getKey());
+ newComp.execs = componentToExecs(newComp.id);
+ all_comp.put(spoutEntry.getKey(), newComp);
+ }
+ newComp.type = Component.ComponentType.SPOUT;
+
+ for (Map.Entry<GlobalStreamId, Grouping> spoutInput :
spoutEntry
+ .getValue().get_common().get_inputs()
+ .entrySet()) {
+ newComp.parents.add(spoutInput.getKey()
+ .get_componentId());
+ if (!all_comp.containsKey(spoutInput
+ .getKey().get_componentId())) {
+ all_comp.put(spoutInput.getKey()
+ .get_componentId(),
+ new Component(spoutInput.getKey()
+ .get_componentId()));
+ }
+ all_comp.get(spoutInput.getKey()
+ .get_componentId()).children.add(spoutEntry
+ .getKey());
+ }
+ }
+ }
+ }
+ // bolts
+ if (storm_topo.get_bolts() != null) {
+ for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
+ .entrySet()) {
+ if (!Utils.isSystemId(boltEntry.getKey())) {
+ Component newComp = null;
+ if (all_comp.containsKey(boltEntry.getKey())) {
+ newComp = all_comp.get(boltEntry.getKey());
+ newComp.execs = componentToExecs(newComp.id);
+ } else {
+ newComp = new Component(boltEntry.getKey());
+ newComp.execs = componentToExecs(newComp.id);
+ all_comp.put(boltEntry.getKey(), newComp);
+ }
+ newComp.type = Component.ComponentType.BOLT;
+
+ for (Map.Entry<GlobalStreamId, Grouping> boltInput :
boltEntry
+ .getValue().get_common().get_inputs()
+ .entrySet()) {
+ newComp.parents.add(boltInput.getKey()
+ .get_componentId());
+ if (!all_comp.containsKey(boltInput
+ .getKey().get_componentId())) {
+ all_comp.put(boltInput.getKey()
+ .get_componentId(),
+ new Component(boltInput.getKey()
+ .get_componentId()));
+ }
+ all_comp.get(boltInput.getKey()
+ .get_componentId()).children.add(boltEntry
+ .getKey());
+ }
+ }
+ }
+ }
+ return all_comp;
+ }
+
+ /**
+ * Gets the on heap memory requirement for a
+ * certain task within a topology
+ * @param exec the executor the inquiry is concerning.
+ * @return Double the amount of on heap memory
+ * requirement for this exec in topology topoId.
+ */
+ public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
+ Double ret = null;
+ if (hasExecInTopo(exec)) {
+ ret = _resourceList
+ .get(exec)
+
.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ }
+ return ret;
+ }
+
+ /**
+ * Gets the off heap memory requirement for a
+ * certain task within a topology
+ * @param exec the executor the inquiry is concerning.
+ * @return Double the amount of off heap memory
+ * requirement for this exec in topology topoId.
+ */
+ public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
+ Double ret = null;
+ if (hasExecInTopo(exec)) {
+ ret = _resourceList
+ .get(exec)
+
.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ }
+ return ret;
+ }
+
+ /**
+ * Gets the total memory requirement for a task
+ * @param exec the executor the inquiry is concerning.
+ * @return Double the total memory requirement
+ * for this exec in topology topoId.
+ */
+ public Double getTotalMemReqTask(ExecutorDetails exec) {
+ if (hasExecInTopo(exec)) {
+ return getOffHeapMemoryRequirement(exec)
+ + getOnHeapMemoryRequirement(exec);
+ }
+ LOG.info("cannot find {}", exec);
--- End diff --
Why does this need a log message? Does not seem to add much.
> Resource Aware Scheduling
> -------------------------
>
> Key: STORM-893
> URL: https://issues.apache.org/jira/browse/STORM-893
> Project: Apache Storm
> Issue Type: Umbrella
> Reporter: Robert Joseph Evans
> Assignee: Boyang Jerry Peng
> Attachments: resource_aware_scheduler_api.pdf
>
>
> At Yahoo we have been working on resource aware scheduling in storm, based
> off of some work done in academia. This rollup ticket is to track the
> complete project. With several sub tasks. Some that are already done and
> need to be pushed back, and others that we have not started on yet.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)