[
https://issues.apache.org/jira/browse/STORM-893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14942329#comment-14942329
]
ASF GitHub Bot commented on STORM-893:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/746#discussion_r41087631
--- Diff:
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
---
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.Component;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public class ResourceAwareStrategy implements IStrategy {
+ private Logger LOG = null;
+ private Topologies _topologies;
+ private Cluster _cluster;
+ //Map key is the supervisor id and the value is the corresponding
RAS_Node Object
+ private Map<String, RAS_Node> _availNodes;
+ private RAS_Node refNode = null;
+ /**
+ * supervisor id -> Node
+ */
+ private Map<String, RAS_Node> _nodes;
+ private Map<String, List<String>> _clusterInfo;
+
+ private final double CPU_WEIGHT = 1.0;
+ private final double MEM_WEIGHT = 1.0;
+ private final double NETWORK_WEIGHT = 1.0;
+
+ public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
+ _topologies = topologies;
+ _cluster = cluster;
+ _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
+ _availNodes = this.getAvailNodes();
+ this.LOG = LoggerFactory.getLogger(this.getClass());
+ _clusterInfo = cluster.getNetworkTopography();
+ LOG.info(this.getClusterInfo());
+ }
+
+ //the returned TreeMap keeps the Components sorted
+ private TreeMap<Integer, List<ExecutorDetails>>
getPriorityToExecutorDetailsListMap(
+ Queue<Component> ordered__Component_list,
Collection<ExecutorDetails> unassignedExecutors) {
+ TreeMap<Integer, List<ExecutorDetails>> retMap = new
TreeMap<Integer, List<ExecutorDetails>>();
+ Integer rank = 0;
+ for (Component ras_comp : ordered__Component_list) {
+ retMap.put(rank, new ArrayList<ExecutorDetails>());
+ for(ExecutorDetails exec : ras_comp.execs) {
+ if(unassignedExecutors.contains(exec)) {
+ retMap.get(rank).add(exec);
+ }
+ }
+ rank++;
+ }
+ return retMap;
+ }
+
+ public Map<WorkerSlot, Collection<ExecutorDetails>>
schedule(TopologyDetails td) {
+ if (_availNodes.size() <= 0) {
+ LOG.warn("No available nodes to schedule tasks on!");
+ return null;
+ }
+ Collection<ExecutorDetails> unassignedExecutors =
_cluster.getUnassignedExecutors(td);
+ Map<WorkerSlot, Collection<ExecutorDetails>>
schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
+ LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+ Collection<ExecutorDetails> scheduledTasks = new
ArrayList<ExecutorDetails>();
+ List<Component> spouts = this.getSpouts(_topologies, td);
+
+ if (spouts.size() == 0) {
+ LOG.error("Cannot find a Spout!");
+ return null;
+ }
+
+ Queue<Component> ordered__Component_list = bfs(_topologies, td,
spouts);
+
+ Map<Integer, List<ExecutorDetails>> priorityToExecutorMap =
getPriorityToExecutorDetailsListMap(ordered__Component_list,
unassignedExecutors);
+ Collection<ExecutorDetails> executorsNotScheduled = new
HashSet<ExecutorDetails>(unassignedExecutors);
+ Integer longestPriorityListSize =
this.getLongestPriorityListSize(priorityToExecutorMap);
+ //Pick the first executor with priority one, then the 1st exec
with priority 2, so on an so forth.
+ //Once we reach the last priority, we go back to priority 1 and
schedule the second task with priority 1.
+ for (int i = 0; i < longestPriorityListSize; i++) {
+ for (Entry<Integer, List<ExecutorDetails>> entry :
priorityToExecutorMap.entrySet()) {
+ Iterator<ExecutorDetails> it = entry.getValue().iterator();
+ if (it.hasNext()) {
+ ExecutorDetails exec = it.next();
+ LOG.info("\n\nAttempting to schedule: {} of component
{}[avail {}] with rank {}",
--- End diff --
Again I think all of these should be debug messages, we don't need them
printed out all the time, but we can configure them on if we are doing
debugging.
> Resource Aware Scheduling
> -------------------------
>
> Key: STORM-893
> URL: https://issues.apache.org/jira/browse/STORM-893
> Project: Apache Storm
> Issue Type: Umbrella
> Reporter: Robert Joseph Evans
> Assignee: Boyang Jerry Peng
> Attachments: resource_aware_scheduler_api.pdf
>
>
> At Yahoo we have been working on resource aware scheduling in storm, based
> off of some work done in academia. This rollup ticket is to track the
> complete project. With several sub tasks. Some that are already done and
> need to be pushed back, and others that we have not started on yet.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)