[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-146896112 @revans2 thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-146895509 @jerrypeng I merged this into master. Looking at the sub tasks for STORM-893, this looked more like STORM-894. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/746 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-146880546 @revans2 Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-146880374 @jerrypeng Technically with one +1 from a committer and 24 hours we can merge this in. But this is a large change so I am glad to see more people looking at it. I'll try to go though it again and hopefully merge it in shortly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-146875059 I have two +1s now, can we merge? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user zhuoliu commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-146692455 Nice work! +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-146266179 @erikdw thank you for your review. I reformated some of the files based on your suggestions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41412262 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/Cluster.java --- @@ -438,6 +451,35 @@ public SupervisorDetails getSupervisorById(String nodeId) { return this.supervisors; } +/* +* Note: Make sure the proper conf was passed into the Cluster constructor before calling this function +* It tries to load the proper network topography detection plugin specified in the config. +*/ +public Map> getNetworkTopography() { +if (networkTopography == null) { +networkTopography = new HashMap>(); +ArrayList supervisorHostNames = new ArrayList(); +for (SupervisorDetails s : supervisors.values()) { +supervisorHostNames.add(s.getHost()); +} + +String clazz = (String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN); +DNSToSwitchMapping topographyMapper = (DNSToSwitchMapping) Utils.newInstance(clazz); + +Map resolvedSuperVisors = topographyMapper.resolve(supervisorHostNames); +for(String hostName: resolvedSuperVisors.keySet()) { --- End diff -- will fix --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41410090 --- Diff: storm-core/src/jvm/backtype/storm/networktopography/DNSToSwitchMapping.java --- @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.networktopography; + +import java.util.List; +import java.util.Map; + +/** + * An interface that must be implemented to allow pluggable + * DNS-name/IP-address to RackID resolvers. + * + */ +public interface DNSToSwitchMapping { + public final static String DEFAULT_RACK = "/default-rack"; --- End diff -- will reformat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41409816 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java --- @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource.strategies; + +import java.util.Collection; +import java.util.Map; + +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.RAS_Node; + +/** + * An interface to for implementing different scheduling strategies for the resource aware scheduling + * In the future stategies will be pluggable + */ +public interface IStrategy { + + public Map> schedule(TopologyDetails td); --- End diff -- will reformate --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user erikdw commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41366711 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/Cluster.java --- @@ -438,6 +451,35 @@ public SupervisorDetails getSupervisorById(String nodeId) { return this.supervisors; } +/* +* Note: Make sure the proper conf was passed into the Cluster constructor before calling this function +* It tries to load the proper network topography detection plugin specified in the config. +*/ +public Map> getNetworkTopography() { +if (networkTopography == null) { +networkTopography = new HashMap>(); +ArrayList supervisorHostNames = new ArrayList(); +for (SupervisorDetails s : supervisors.values()) { +supervisorHostNames.add(s.getHost()); +} + +String clazz = (String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN); +DNSToSwitchMapping topographyMapper = (DNSToSwitchMapping) Utils.newInstance(clazz); + +Map resolvedSuperVisors = topographyMapper.resolve(supervisorHostNames); +for(String hostName: resolvedSuperVisors.keySet()) { --- End diff -- Please use consistent for/if/while styling. i.e., most of your new code is `for(`, but some (see 9 lines above) is `for (`. It *seems* like the storm project is more consistently using `for (`. This comment also applies to the `){` at the end of various lines. That should always be `) {` IMNSHO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user erikdw commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41366483 --- Diff: storm-core/src/jvm/backtype/storm/networktopography/DNSToSwitchMapping.java --- @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.networktopography; + +import java.util.List; +import java.util.Map; + +/** + * An interface that must be implemented to allow pluggable + * DNS-name/IP-address to RackID resolvers. + * + */ +public interface DNSToSwitchMapping { + public final static String DEFAULT_RACK = "/default-rack"; --- End diff -- please use consistent indentation. The java code in the storm project seems to use 4-space indents. This applies to this file as well as AbstractDNSToSwitchMapping.java. Maybe others that I've missed with all the generated code making this a bit hard to read through. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user erikdw commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41366160 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java --- @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource.strategies; + +import java.util.Collection; +import java.util.Map; + +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.RAS_Node; + +/** + * An interface to for implementing different scheduling strategies for the resource aware scheduling + * In the future stategies will be pluggable + */ +public interface IStrategy { + + public Map> schedule(TopologyDetails td); --- End diff -- this is either a tab (evil) or 8 spaces. It seems most of this java code has 4 space indents. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-146066442 @zhuoliu Thanks for your review. I think just push a commit that I think address all your comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41347393 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java --- @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource.strategies; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.TreeMap; +import java.util.HashSet; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.scheduler.Cluster; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.Component; +import backtype.storm.scheduler.resource.RAS_Node; + +public class ResourceAwareStrategy implements IStrategy { +private Logger LOG = null; +private Topologies _topologies; +private Cluster _cluster; +//Map key is the supervisor id and the value is the corresponding RAS_Node Object +private Map _availNodes; +private RAS_Node refNode = null; +/** + * supervisor id -> Node + */ +private Map _nodes; +private Map> _clusterInfo; + +private final double CPU_WEIGHT = 1.0; +private final double MEM_WEIGHT = 1.0; +private final double NETWORK_WEIGHT = 1.0; + +public ResourceAwareStrategy(Cluster cluster, Topologies topologies) { +_topologies = topologies; +_cluster = cluster; +_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies); +_availNodes = this.getAvailNodes(); +this.LOG = LoggerFactory.getLogger(this.getClass()); +_clusterInfo = cluster.getNetworkTopography(); +LOG.debug(this.getClusterInfo()); +} + +//the returned TreeMap keeps the Components sorted +private TreeMap> getPriorityToExecutorDetailsListMap( +Queue ordered__Component_list, Collection unassignedExecutors) { +TreeMap> retMap = new TreeMap>(); +Integer rank = 0; +for (Component ras_comp : ordered__Component_list) { +retMap.put(rank, new ArrayList()); +for(ExecutorDetails exec : ras_comp.execs) { +if(unassignedExecutors.contains(exec)) { +retMap.get(rank).add(exec); +} +} +rank++; +} +return retMap; +} + +public Map> schedule(TopologyDetails td) { +if (_availNodes.size() <= 0) { +LOG.warn("No available nodes to schedule tasks on!"); +return null; +} +Collection unassignedExecutors = _cluster.getUnassignedExecutors(td); +Map> schedulerAssignmentMap = new HashMap>(); +LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors); +Collection scheduledTasks = new ArrayList(); +List spouts = this.getSpouts(_topologies, td); + +if (spouts.size() == 0) { +LOG.error("Cannot find a Spout!"); +return null; +} + +Queue ordered__Component_list = bfs(_topologies, td, spouts); + +Map> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors); +Collection executorsNotScheduled = new HashSet(unassignedExecutors); +Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap); +//Pick the first executor with priority one, then the 1st exec with prior
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41347451 --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java --- @@ -18,10 +18,7 @@ package backtype.storm.utils; import backtype.storm.Config; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.ComponentObject; -import backtype.storm.generated.StormTopology; +import backtype.storm.generated.*; --- End diff -- will expand --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41347349 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java --- @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource.strategies; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.TreeMap; +import java.util.HashSet; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.scheduler.Cluster; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.Component; +import backtype.storm.scheduler.resource.RAS_Node; + +public class ResourceAwareStrategy implements IStrategy { +private Logger LOG = null; +private Topologies _topologies; +private Cluster _cluster; +//Map key is the supervisor id and the value is the corresponding RAS_Node Object +private Map _availNodes; +private RAS_Node refNode = null; +/** + * supervisor id -> Node + */ +private Map _nodes; +private Map> _clusterInfo; + +private final double CPU_WEIGHT = 1.0; +private final double MEM_WEIGHT = 1.0; +private final double NETWORK_WEIGHT = 1.0; + +public ResourceAwareStrategy(Cluster cluster, Topologies topologies) { +_topologies = topologies; +_cluster = cluster; +_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies); +_availNodes = this.getAvailNodes(); +this.LOG = LoggerFactory.getLogger(this.getClass()); +_clusterInfo = cluster.getNetworkTopography(); +LOG.debug(this.getClusterInfo()); +} + +//the returned TreeMap keeps the Components sorted +private TreeMap> getPriorityToExecutorDetailsListMap( +Queue ordered__Component_list, Collection unassignedExecutors) { +TreeMap> retMap = new TreeMap>(); +Integer rank = 0; +for (Component ras_comp : ordered__Component_list) { +retMap.put(rank, new ArrayList()); +for(ExecutorDetails exec : ras_comp.execs) { +if(unassignedExecutors.contains(exec)) { +retMap.get(rank).add(exec); +} +} +rank++; +} +return retMap; +} + +public Map> schedule(TopologyDetails td) { +if (_availNodes.size() <= 0) { +LOG.warn("No available nodes to schedule tasks on!"); +return null; +} +Collection unassignedExecutors = _cluster.getUnassignedExecutors(td); +Map> schedulerAssignmentMap = new HashMap>(); +LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors); +Collection scheduledTasks = new ArrayList(); +List spouts = this.getSpouts(_topologies, td); + +if (spouts.size() == 0) { +LOG.error("Cannot find a Spout!"); +return null; +} + +Queue ordered__Component_list = bfs(_topologies, td, spouts); + +Map> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors); +Collection executorsNotScheduled = new HashSet(unassignedExecutors); +Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap); +//Pick the first executor with priority one, then the 1st exec with prior
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user zhuoliu commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41285663 --- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java --- @@ -18,10 +18,7 @@ package backtype.storm.utils; import backtype.storm.Config; -import backtype.storm.generated.AuthorizationException; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.ComponentObject; -import backtype.storm.generated.StormTopology; +import backtype.storm.generated.*; --- End diff -- May expand this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user zhuoliu commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41283467 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java --- @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource.strategies; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.TreeMap; +import java.util.HashSet; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.scheduler.Cluster; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.Component; +import backtype.storm.scheduler.resource.RAS_Node; + +public class ResourceAwareStrategy implements IStrategy { +private Logger LOG = null; +private Topologies _topologies; +private Cluster _cluster; +//Map key is the supervisor id and the value is the corresponding RAS_Node Object +private Map _availNodes; +private RAS_Node refNode = null; +/** + * supervisor id -> Node + */ +private Map _nodes; +private Map> _clusterInfo; + +private final double CPU_WEIGHT = 1.0; +private final double MEM_WEIGHT = 1.0; +private final double NETWORK_WEIGHT = 1.0; + +public ResourceAwareStrategy(Cluster cluster, Topologies topologies) { +_topologies = topologies; +_cluster = cluster; +_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies); +_availNodes = this.getAvailNodes(); +this.LOG = LoggerFactory.getLogger(this.getClass()); +_clusterInfo = cluster.getNetworkTopography(); +LOG.debug(this.getClusterInfo()); +} + +//the returned TreeMap keeps the Components sorted +private TreeMap> getPriorityToExecutorDetailsListMap( +Queue ordered__Component_list, Collection unassignedExecutors) { +TreeMap> retMap = new TreeMap>(); +Integer rank = 0; +for (Component ras_comp : ordered__Component_list) { +retMap.put(rank, new ArrayList()); +for(ExecutorDetails exec : ras_comp.execs) { +if(unassignedExecutors.contains(exec)) { +retMap.get(rank).add(exec); +} +} +rank++; +} +return retMap; +} + +public Map> schedule(TopologyDetails td) { +if (_availNodes.size() <= 0) { +LOG.warn("No available nodes to schedule tasks on!"); +return null; +} +Collection unassignedExecutors = _cluster.getUnassignedExecutors(td); +Map> schedulerAssignmentMap = new HashMap>(); +LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors); +Collection scheduledTasks = new ArrayList(); +List spouts = this.getSpouts(_topologies, td); + +if (spouts.size() == 0) { +LOG.error("Cannot find a Spout!"); +return null; +} + +Queue ordered__Component_list = bfs(_topologies, td, spouts); + +Map> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors); +Collection executorsNotScheduled = new HashSet(unassignedExecutors); +Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap); +//Pick the first executor with priority one, then the 1st exec with priorit
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user zhuoliu commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41283265 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java --- @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource.strategies; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.TreeMap; +import java.util.HashSet; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.scheduler.Cluster; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.Component; +import backtype.storm.scheduler.resource.RAS_Node; + +public class ResourceAwareStrategy implements IStrategy { +private Logger LOG = null; +private Topologies _topologies; +private Cluster _cluster; +//Map key is the supervisor id and the value is the corresponding RAS_Node Object +private Map _availNodes; +private RAS_Node refNode = null; +/** + * supervisor id -> Node + */ +private Map _nodes; +private Map> _clusterInfo; + +private final double CPU_WEIGHT = 1.0; +private final double MEM_WEIGHT = 1.0; +private final double NETWORK_WEIGHT = 1.0; + +public ResourceAwareStrategy(Cluster cluster, Topologies topologies) { +_topologies = topologies; +_cluster = cluster; +_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies); +_availNodes = this.getAvailNodes(); +this.LOG = LoggerFactory.getLogger(this.getClass()); +_clusterInfo = cluster.getNetworkTopography(); +LOG.debug(this.getClusterInfo()); +} + +//the returned TreeMap keeps the Components sorted +private TreeMap> getPriorityToExecutorDetailsListMap( +Queue ordered__Component_list, Collection unassignedExecutors) { +TreeMap> retMap = new TreeMap>(); +Integer rank = 0; +for (Component ras_comp : ordered__Component_list) { +retMap.put(rank, new ArrayList()); +for(ExecutorDetails exec : ras_comp.execs) { +if(unassignedExecutors.contains(exec)) { +retMap.get(rank).add(exec); +} +} +rank++; +} +return retMap; +} + +public Map> schedule(TopologyDetails td) { +if (_availNodes.size() <= 0) { +LOG.warn("No available nodes to schedule tasks on!"); +return null; +} +Collection unassignedExecutors = _cluster.getUnassignedExecutors(td); +Map> schedulerAssignmentMap = new HashMap>(); +LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors); +Collection scheduledTasks = new ArrayList(); +List spouts = this.getSpouts(_topologies, td); + +if (spouts.size() == 0) { +LOG.error("Cannot find a Spout!"); +return null; +} + +Queue ordered__Component_list = bfs(_topologies, td, spouts); + +Map> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors); +Collection executorsNotScheduled = new HashSet(unassignedExecutors); +Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap); +//Pick the first executor with priority one, then the 1st exec with priorit
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-145261204 @revans2 Thanks for your review again! I just push a commit that contains those revisions you suggested --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-145253628 @jerrypeng Just two more issues, both of which re relatively minor. After them I am +1 for merging this in. Then hopefully development work on RAS can move forward in open source. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41087631 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java --- @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource.strategies; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.TreeMap; +import java.util.HashSet; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.scheduler.Cluster; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.Component; +import backtype.storm.scheduler.resource.RAS_Node; + +public class ResourceAwareStrategy implements IStrategy { +private Logger LOG = null; +private Topologies _topologies; +private Cluster _cluster; +//Map key is the supervisor id and the value is the corresponding RAS_Node Object +private Map _availNodes; +private RAS_Node refNode = null; +/** + * supervisor id -> Node + */ +private Map _nodes; +private Map> _clusterInfo; + +private final double CPU_WEIGHT = 1.0; +private final double MEM_WEIGHT = 1.0; +private final double NETWORK_WEIGHT = 1.0; + +public ResourceAwareStrategy(Cluster cluster, Topologies topologies) { +_topologies = topologies; +_cluster = cluster; +_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies); +_availNodes = this.getAvailNodes(); +this.LOG = LoggerFactory.getLogger(this.getClass()); +_clusterInfo = cluster.getNetworkTopography(); +LOG.info(this.getClusterInfo()); +} + +//the returned TreeMap keeps the Components sorted +private TreeMap> getPriorityToExecutorDetailsListMap( +Queue ordered__Component_list, Collection unassignedExecutors) { +TreeMap> retMap = new TreeMap>(); +Integer rank = 0; +for (Component ras_comp : ordered__Component_list) { +retMap.put(rank, new ArrayList()); +for(ExecutorDetails exec : ras_comp.execs) { +if(unassignedExecutors.contains(exec)) { +retMap.get(rank).add(exec); +} +} +rank++; +} +return retMap; +} + +public Map> schedule(TopologyDetails td) { +if (_availNodes.size() <= 0) { +LOG.warn("No available nodes to schedule tasks on!"); +return null; +} +Collection unassignedExecutors = _cluster.getUnassignedExecutors(td); +Map> schedulerAssignmentMap = new HashMap>(); +LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors); +Collection scheduledTasks = new ArrayList(); +List spouts = this.getSpouts(_topologies, td); + +if (spouts.size() == 0) { +LOG.error("Cannot find a Spout!"); +return null; +} + +Queue ordered__Component_list = bfs(_topologies, td, spouts); + +Map> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors); +Collection executorsNotScheduled = new HashSet(unassignedExecutors); +Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap); +//Pick the first executor with priority one, then the 1st exec with priority
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r41087614 --- Diff: storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java --- @@ -0,0 +1,39 @@ +package backtype.storm.networktopography; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class implements the {@link DNSToSwitchMapping} interface + *It returns the DEFAULT_RACK for every host. + */ +public final class DefaultRackDNSToSwitchMapping extends CachedDNSToSwitchMapping { --- End diff -- I now we borrowed this from Hadoop, and it looks like Hadoop has the same issue, but we cannot subclass CachedDNSToSwitchMapping to get caching. It does not work that way. Please either fix CacheDNSToSwitchMapping to actually do caching when it is a parent class, or remove it all together and have it's children inherent directly from AbstractDNSToSwitchMapping. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-145155422 @knusbaum @rfarivar @zhuoliu --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-145152436 Done making modifications based on comments. Can I get some more reviews please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-145152237 @revans2 Thanks for the review! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40850849 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java --- @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource.strategies; + +import java.util.Collection; +import java.util.Map; + +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.RAS_Node; + +public interface IStrategy { --- End diff -- will add --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40850542 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java --- @@ -0,0 +1,117 @@ +package backtype.storm.scheduler.resource; + +import backtype.storm.Config; +import backtype.storm.generated.Bolt; +import backtype.storm.generated.SpoutSpec; +import backtype.storm.generated.StormTopology; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Created by jerrypeng on 9/22/15. + */ +public class ResourceUtils { --- End diff -- will remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40850487 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java --- @@ -0,0 +1,117 @@ +package backtype.storm.scheduler.resource; --- End diff -- will add --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40850307 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java --- @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource; + +import java.util.*; + +import backtype.storm.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.scheduler.Cluster; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.IScheduler; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy; + +public class ResourceAwareScheduler implements IScheduler { +private static final Logger LOG = LoggerFactory +.getLogger(ResourceAwareScheduler.class); +@SuppressWarnings("rawtypes") +private Map _conf; + +@Override +public void prepare(Map conf) { +_conf = conf; +} + +@Override +public void schedule(Topologies topologies, Cluster cluster) { +LOG.info("\n\n\nRerunning ResourceAwareScheduler..."); --- End diff -- will remove/change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40849566 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java --- @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource; + +import java.util.*; --- End diff -- will change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40849297 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +95,302 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = ResourceUtils.parseResources(bolt +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = ResourceUtils.parseResources(spout +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.debug( +"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}", +this.getExecutorToComponent().get(exec), +exec, + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); +this.addDefaultResforExec(exec); +} +} +} + +private List componentToExecs(String comp) { +List execs = new ArrayList<>(); +for (Map.Entry entry : executorToComponent.entrySet()) { +if (entry.getValue().equals(comp)) { +execs.add(entry.getKey()); +} +} +return execs; +} + +/** + * Returns a representation of the non-system components of the topology graph + * Each Component object in the returning map is populated with the list of its + * parents, children and execs assigned to that component. + * @return a map of components + */ +public Map getComponents() { +Map all_comp = new HashMap(); + +StormTopology storm_topo = this.topology; +// spouts +if (storm_topo.get_spouts() != null) { +for (Map.Entry spoutEntry : storm_topo +.get_spouts().entrySet()) { +if (!Utils.isSystemId(spoutEntry.getKey())) { +Component newComp = null; +if (all_comp.containsKey(spoutEntry.getKey())) { +newComp = all_comp.get(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id); +} else { +newComp = new Component(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.i
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40849422 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +95,302 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = ResourceUtils.parseResources(bolt +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = ResourceUtils.parseResources(spout +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.debug( +"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}", +this.getExecutorToComponent().get(exec), +exec, + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); +this.addDefaultResforExec(exec); +} +} +} + +private List componentToExecs(String comp) { +List execs = new ArrayList<>(); +for (Map.Entry entry : executorToComponent.entrySet()) { +if (entry.getValue().equals(comp)) { +execs.add(entry.getKey()); +} +} +return execs; +} + +/** + * Returns a representation of the non-system components of the topology graph + * Each Component object in the returning map is populated with the list of its + * parents, children and execs assigned to that component. + * @return a map of components + */ +public Map getComponents() { +Map all_comp = new HashMap(); + +StormTopology storm_topo = this.topology; +// spouts +if (storm_topo.get_spouts() != null) { +for (Map.Entry spoutEntry : storm_topo +.get_spouts().entrySet()) { +if (!Utils.isSystemId(spoutEntry.getKey())) { +Component newComp = null; +if (all_comp.containsKey(spoutEntry.getKey())) { +newComp = all_comp.get(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id); +} else { +newComp = new Component(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.i
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40849301 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +95,302 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = ResourceUtils.parseResources(bolt +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = ResourceUtils.parseResources(spout +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.debug( +"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}", +this.getExecutorToComponent().get(exec), +exec, + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); +this.addDefaultResforExec(exec); +} +} +} + +private List componentToExecs(String comp) { +List execs = new ArrayList<>(); +for (Map.Entry entry : executorToComponent.entrySet()) { +if (entry.getValue().equals(comp)) { +execs.add(entry.getKey()); +} +} +return execs; +} + +/** + * Returns a representation of the non-system components of the topology graph + * Each Component object in the returning map is populated with the list of its + * parents, children and execs assigned to that component. + * @return a map of components + */ +public Map getComponents() { +Map all_comp = new HashMap(); + +StormTopology storm_topo = this.topology; +// spouts +if (storm_topo.get_spouts() != null) { +for (Map.Entry spoutEntry : storm_topo +.get_spouts().entrySet()) { +if (!Utils.isSystemId(spoutEntry.getKey())) { +Component newComp = null; +if (all_comp.containsKey(spoutEntry.getKey())) { +newComp = all_comp.get(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id); +} else { +newComp = new Component(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.i
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40849279 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +95,302 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = ResourceUtils.parseResources(bolt +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = ResourceUtils.parseResources(spout +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.debug( +"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}", +this.getExecutorToComponent().get(exec), +exec, + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); +this.addDefaultResforExec(exec); +} +} +} + +private List componentToExecs(String comp) { +List execs = new ArrayList<>(); +for (Map.Entry entry : executorToComponent.entrySet()) { +if (entry.getValue().equals(comp)) { +execs.add(entry.getKey()); +} +} +return execs; +} + +/** + * Returns a representation of the non-system components of the topology graph + * Each Component object in the returning map is populated with the list of its + * parents, children and execs assigned to that component. + * @return a map of components + */ +public Map getComponents() { +Map all_comp = new HashMap(); + +StormTopology storm_topo = this.topology; +// spouts +if (storm_topo.get_spouts() != null) { +for (Map.Entry spoutEntry : storm_topo +.get_spouts().entrySet()) { +if (!Utils.isSystemId(spoutEntry.getKey())) { +Component newComp = null; +if (all_comp.containsKey(spoutEntry.getKey())) { +newComp = all_comp.get(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id); +} else { +newComp = new Component(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.i
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40849130 --- Diff: storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.networktopography; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * This is a base class for DNS to Switch mappings. It is not mandatory to + * derive {@link DNSToSwitchMapping} implementations from it, but it is strongly + * recommended, as it makes it easy for the developers to add new methods + * to this base class that are automatically picked up by all implementations. + * + * + */ +public abstract class AbstractDNSToSwitchMapping +implements DNSToSwitchMapping { + +// private Configuration conf; --- End diff -- will remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40848979 --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java --- @@ -442,4 +446,35 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste */ public void onCompleted(String srcFile, String targetFile, long totalBytes); } + + --- End diff -- will remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-144514012 @jerrypeng for the most part the code looks really good. I would mostly just like to see a lot of the debug logging turned into LOG.debug instead of LOG.info statements. For everyone else I think it is important to point out that resource aware scheduling is still an experimental feature. The APIs should be fairly stable, but there is a lot more work to be done with it before it is production ready. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40838906 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java --- @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource.strategies; + +import java.util.Collection; +import java.util.Map; + +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.RAS_Node; + +public interface IStrategy { --- End diff -- Can we add a comment about what a Strategy is? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40838824 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java --- @@ -0,0 +1,117 @@ +package backtype.storm.scheduler.resource; + +import backtype.storm.Config; +import backtype.storm.generated.Bolt; +import backtype.storm.generated.SpoutSpec; +import backtype.storm.generated.StormTopology; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Created by jerrypeng on 9/22/15. + */ +public class ResourceUtils { --- End diff -- git should take care of history instead of code comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40838769 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java --- @@ -0,0 +1,117 @@ +package backtype.storm.scheduler.resource; --- End diff -- This file needs the apache header. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40838571 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java --- @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource; + +import java.util.*; + +import backtype.storm.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.scheduler.Cluster; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.IScheduler; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy; + +public class ResourceAwareScheduler implements IScheduler { +private static final Logger LOG = LoggerFactory +.getLogger(ResourceAwareScheduler.class); +@SuppressWarnings("rawtypes") +private Map _conf; + +@Override +public void prepare(Map conf) { +_conf = conf; +} + +@Override +public void schedule(Topologies topologies, Cluster cluster) { +LOG.info("\n\n\nRerunning ResourceAwareScheduler..."); --- End diff -- debug or remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40838543 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java --- @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.scheduler.resource; + +import java.util.*; --- End diff -- I don't really like .* imports they can result in errors if new classes are added to that package. It is not a big deal, but would be nice to avoid. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40838272 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +95,302 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = ResourceUtils.parseResources(bolt +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = ResourceUtils.parseResources(spout +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.debug( +"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}", +this.getExecutorToComponent().get(exec), +exec, + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); +this.addDefaultResforExec(exec); +} +} +} + +private List componentToExecs(String comp) { +List execs = new ArrayList<>(); +for (Map.Entry entry : executorToComponent.entrySet()) { +if (entry.getValue().equals(comp)) { +execs.add(entry.getKey()); +} +} +return execs; +} + +/** + * Returns a representation of the non-system components of the topology graph + * Each Component object in the returning map is populated with the list of its + * parents, children and execs assigned to that component. + * @return a map of components + */ +public Map getComponents() { +Map all_comp = new HashMap(); + +StormTopology storm_topo = this.topology; +// spouts +if (storm_topo.get_spouts() != null) { +for (Map.Entry spoutEntry : storm_topo +.get_spouts().entrySet()) { +if (!Utils.isSystemId(spoutEntry.getKey())) { +Component newComp = null; +if (all_comp.containsKey(spoutEntry.getKey())) { +newComp = all_comp.get(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id); +} else { +newComp = new Component(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id)
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40838180 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +95,302 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = ResourceUtils.parseResources(bolt +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = ResourceUtils.parseResources(spout +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.debug( +"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}", +this.getExecutorToComponent().get(exec), +exec, + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); +this.addDefaultResforExec(exec); +} +} +} + +private List componentToExecs(String comp) { +List execs = new ArrayList<>(); +for (Map.Entry entry : executorToComponent.entrySet()) { +if (entry.getValue().equals(comp)) { +execs.add(entry.getKey()); +} +} +return execs; +} + +/** + * Returns a representation of the non-system components of the topology graph + * Each Component object in the returning map is populated with the list of its + * parents, children and execs assigned to that component. + * @return a map of components + */ +public Map getComponents() { +Map all_comp = new HashMap(); + +StormTopology storm_topo = this.topology; +// spouts +if (storm_topo.get_spouts() != null) { +for (Map.Entry spoutEntry : storm_topo +.get_spouts().entrySet()) { +if (!Utils.isSystemId(spoutEntry.getKey())) { +Component newComp = null; +if (all_comp.containsKey(spoutEntry.getKey())) { +newComp = all_comp.get(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id); +} else { +newComp = new Component(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id)
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40838121 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +95,302 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = ResourceUtils.parseResources(bolt +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = ResourceUtils.parseResources(spout +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.debug( +"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}", +this.getExecutorToComponent().get(exec), +exec, + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); +this.addDefaultResforExec(exec); +} +} +} + +private List componentToExecs(String comp) { +List execs = new ArrayList<>(); +for (Map.Entry entry : executorToComponent.entrySet()) { +if (entry.getValue().equals(comp)) { +execs.add(entry.getKey()); +} +} +return execs; +} + +/** + * Returns a representation of the non-system components of the topology graph + * Each Component object in the returning map is populated with the list of its + * parents, children and execs assigned to that component. + * @return a map of components + */ +public Map getComponents() { +Map all_comp = new HashMap(); + +StormTopology storm_topo = this.topology; +// spouts +if (storm_topo.get_spouts() != null) { +for (Map.Entry spoutEntry : storm_topo +.get_spouts().entrySet()) { +if (!Utils.isSystemId(spoutEntry.getKey())) { +Component newComp = null; +if (all_comp.containsKey(spoutEntry.getKey())) { +newComp = all_comp.get(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id); +} else { +newComp = new Component(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id)
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40838080 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +95,302 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = ResourceUtils.parseResources(bolt +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = ResourceUtils.parseResources(spout +.getValue().get_common().get_json_conf()); +ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.debug( +"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}", +this.getExecutorToComponent().get(exec), +exec, + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); +this.addDefaultResforExec(exec); +} +} +} + +private List componentToExecs(String comp) { +List execs = new ArrayList<>(); +for (Map.Entry entry : executorToComponent.entrySet()) { +if (entry.getValue().equals(comp)) { +execs.add(entry.getKey()); +} +} +return execs; +} + +/** + * Returns a representation of the non-system components of the topology graph + * Each Component object in the returning map is populated with the list of its + * parents, children and execs assigned to that component. + * @return a map of components + */ +public Map getComponents() { +Map all_comp = new HashMap(); + +StormTopology storm_topo = this.topology; +// spouts +if (storm_topo.get_spouts() != null) { +for (Map.Entry spoutEntry : storm_topo +.get_spouts().entrySet()) { +if (!Utils.isSystemId(spoutEntry.getKey())) { +Component newComp = null; +if (all_comp.containsKey(spoutEntry.getKey())) { +newComp = all_comp.get(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id); +} else { +newComp = new Component(spoutEntry.getKey()); +newComp.execs = componentToExecs(newComp.id)
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40837655 --- Diff: storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.networktopography; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * This is a base class for DNS to Switch mappings. It is not mandatory to + * derive {@link DNSToSwitchMapping} implementations from it, but it is strongly + * recommended, as it makes it easy for the developers to add new methods + * to this base class that are automatically picked up by all implementations. + * + * + */ +public abstract class AbstractDNSToSwitchMapping +implements DNSToSwitchMapping { + +// private Configuration conf; --- End diff -- Lets remove this if it is not used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40837456 --- Diff: storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java --- @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-2-6") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-30") --- End diff -- Can we revert the generated files that only have a date change in them? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40837399 --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java --- @@ -442,4 +446,35 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste */ public void onCompleted(String srcFile, String targetFile, long totalBytes); } + + +private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException { +double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf); +Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)); +if(topologyWorkerMaxHeapSize < largestMemReq) { +throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=" + +Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < " ++ largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount"); +} +} + + --- End diff -- Extra line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40837341 --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java --- @@ -442,4 +446,35 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste */ public void onCompleted(String srcFile, String targetFile, long totalBytes); } + + --- End diff -- Extra space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-144451299 upmerged! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40735548 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/Topologies.java --- @@ -21,10 +21,17 @@ import java.util.HashMap; import java.util.Map; +import backtype.storm.scheduler.resource.RAS_Component; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class Topologies { Map topologies; Map nameToId; - +Map> _allRAS_Components; + +private static final Logger LOG = LoggerFactory.getLogger(Topologies.class); --- End diff -- will remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40735533 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +96,304 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any bolts!"); +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(spout +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.info( +"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}", +this.getExecutorToComponent().get(exec), +exec, + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); +this.addDefaultResforExec(exec); +} +} +} + +private List componentToExecs(String comp) { +List execs = new ArrayList<>(); +for (Map.Entry entry : executorToComponent.entrySet()) { +if (entry.getValue().equals(comp)) { +execs.add(entry.getKey()); +} +} +return execs; +} + +/** + * Returns a representation of the non-system components of the topology graph + * Each Component object in the returning map is populated with the list of its + * parents, children and execs assigned to that component. + * @return a map of components + */ +public Map getRAS_Components() { --- End diff -- will rename --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40734516 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +96,304 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any bolts!"); +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(spout +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.info( --- End diff -- will change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40734496 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +96,304 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any bolts!"); --- End diff -- will remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40734280 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -31,41 +42,47 @@ StormTopology topology; Map executorToComponent; int numWorkers; - +//>> +private Map> _resourceList; +//Scheduler this topology should be scheduled with +private String scheduler; --- End diff -- will delete --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40732961 --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java --- @@ -442,4 +444,36 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste */ public void onCompleted(String srcFile, String targetFile, long totalBytes); } + + +private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException { +LOG.info("Validating storm Confs..."); +double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf); +Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)); +if(topologyWorkerMaxHeapSize < largestMemReq) { +throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=" + +Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < " ++ largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount"); +} +} + + +private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map topologyConf) { +double largestMemoryOperator = 0.0; +for(Map entry : backtype.storm.scheduler.resource.Utils.getBoltsResources(topology, topologyConf).values()) { --- End diff -- rename --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40732321 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +96,304 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any bolts!"); +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(spout +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.info( --- End diff -- Can we make this debug instead? I don't think we need to tell everyone all the time about all of the topologies resources. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40732622 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +96,304 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any bolts!"); +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(spout +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); +} +//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks) +for(ExecutorDetails exec : this.getExecutors()) { +if (_resourceList.containsKey(exec) == false) { +LOG.info( +"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}", +this.getExecutorToComponent().get(exec), +exec, + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), + this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); +this.addDefaultResforExec(exec); +} +} +} + +private List componentToExecs(String comp) { +List execs = new ArrayList<>(); +for (Map.Entry entry : executorToComponent.entrySet()) { +if (entry.getValue().equals(comp)) { +execs.add(entry.getKey()); +} +} +return execs; +} + +/** + * Returns a representation of the non-system components of the topology graph + * Each Component object in the returning map is populated with the list of its + * parents, children and execs assigned to that component. + * @return a map of components + */ +public Map getRAS_Components() { --- End diff -- I'm not sure that RAS_Component is the right name for this class or this method. The class has nothing to do with resources at all. Perhaps we can call it a Component and the method getComponentGraph instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40732462 --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java --- @@ -442,4 +444,36 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste */ public void onCompleted(String srcFile, String targetFile, long totalBytes); } + + +private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException { +LOG.info("Validating storm Confs..."); --- End diff -- will delete --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40732244 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +96,304 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any bolts!"); +} +// Extract spout memory info +if (this.topology.get_spouts() != null) { +for (Map.Entry spout : this.topology.get_spouts().entrySet()) { +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(spout +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (spout.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any spouts!"); --- End diff -- Ok so this is rather odd, this should probably be a warning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40731979 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -31,41 +42,47 @@ StormTopology topology; Map executorToComponent; int numWorkers; - +//>> +private Map> _resourceList; +//Scheduler this topology should be scheduled with +private String scheduler; --- End diff -- This does not appear to be used anywhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40732194 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java --- @@ -79,11 +96,304 @@ public StormTopology getTopology() { ret.put(executor, compId); } } - + return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } + +private void initResourceList() { +_resourceList = new HashMap>(); +// Extract bolt memory info +if (this.topology.get_bolts() != null) { +for (Map.Entry bolt : this.topology.get_bolts().entrySet()) { +//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) +Map topology_resources = backtype.storm.scheduler.resource.Utils.parseResources(bolt +.getValue().get_common().get_json_conf()); + backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf); +for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { +if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +_resourceList.put(anExecutorToComponent.getKey(), topology_resources); +} +} +} +} else { +LOG.warn("Topology " + topologyId + " does not seem to have any bolts!"); --- End diff -- Is this really a warning? I would almost rather see this removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40731800 --- Diff: storm-core/src/jvm/backtype/storm/scheduler/Topologies.java --- @@ -21,10 +21,17 @@ import java.util.HashMap; import java.util.Map; +import backtype.storm.scheduler.resource.RAS_Component; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class Topologies { Map topologies; Map nameToId; - +Map> _allRAS_Components; + +private static final Logger LOG = LoggerFactory.getLogger(Topologies.class); --- End diff -- This does not look like it is being used. Not a big deal, but unless we use it we should probably remove it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40731631 --- Diff: storm-core/src/jvm/backtype/storm/Config.java --- @@ -1101,6 +1127,33 @@ public static final Object TOPOLOGY_TASKS_SCHEMA = ConfigValidation.IntegerValidator; /** + * The maximum amount of memory an instance of a spout/bolt will take on heap. This enables the scheduler + * to allocate slots on machines with enough available memory. + */ +public static final String TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB = "topology.component.resources.onheap.memory.mb"; +public static final Object TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator; + +/** + * The maximum amount of memory an instance of a spout/bolt will take off heap. This enables the scheduler + * to allocate slots on machines with enough available memory. + */ +public static final String TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB = "topology.component.resources.offheap.memory.mb"; +public static final Object TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator; + +/** + * The config indicates the percentage of cpu for a core. Assuming the a core value to be 100, a --- End diff -- will fix comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40731147 --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java --- @@ -187,7 +187,7 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo * @throws AuthorizationException */ --- End diff -- add comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40730611 --- Diff: storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java --- @@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-2-6") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-18") --- End diff -- In general we try to not add in thrift changes that are just date changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40730558 --- Diff: storm-core/src/jvm/backtype/storm/Config.java --- @@ -167,6 +167,16 @@ public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class; /** + * What Network Topography detection classes should we use. + * Given a list of supervisor hostnames (or IP addresses), this class would return a list of + * rack names that correspond to the supervisors. This information is stored in Cluster.java, and + * is used in the resource aware scheduler. + */ +public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = "storm.network.topography.plugin"; +public static final Object STORM_NETWORK_TOPOGRAPHY_PLUGIN_SCHEMA = String.class; + + --- End diff -- fix --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40730552 --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java --- @@ -442,4 +444,36 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste */ public void onCompleted(String srcFile, String targetFile, long totalBytes); } + + +private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException { +LOG.info("Validating storm Confs..."); +double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf); +Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)); +if(topologyWorkerMaxHeapSize < largestMemReq) { +throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=" + +Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < " ++ largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount"); +} +} + + +private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map topologyConf) { +double largestMemoryOperator = 0.0; +for(Map entry : backtype.storm.scheduler.resource.Utils.getBoltsResources(topology, topologyConf).values()) { --- End diff -- Can we rename backtype.storm.scheduler.resource.Utils to be something like ResourceUtils instead. Just so there is no conflict with imports. I think it will make the code cleaner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40730370 --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java --- @@ -442,4 +444,36 @@ public static String submitJar(Map conf, String localJar, ProgressListener liste */ public void onCompleted(String srcFile, String targetFile, long totalBytes); } + + +private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException { +LOG.info("Validating storm Confs..."); --- End diff -- I don't think we need this log message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40730329 --- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java --- @@ -187,7 +187,7 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo * @throws AuthorizationException */ --- End diff -- If we now throw an IllegalArgumentException, even if it is a Runtime Exception we should document it here, and ideally describe the situation when it is thrown. Otherwise why declare it at all? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40730129 --- Diff: storm-core/src/jvm/backtype/storm/Config.java --- @@ -1101,6 +1127,33 @@ public static final Object TOPOLOGY_TASKS_SCHEMA = ConfigValidation.IntegerValidator; /** + * The maximum amount of memory an instance of a spout/bolt will take on heap. This enables the scheduler + * to allocate slots on machines with enough available memory. + */ +public static final String TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB = "topology.component.resources.onheap.memory.mb"; +public static final Object TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator; + +/** + * The maximum amount of memory an instance of a spout/bolt will take off heap. This enables the scheduler + * to allocate slots on machines with enough available memory. + */ +public static final String TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB = "topology.component.resources.offheap.memory.mb"; +public static final Object TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator; + +/** + * The config indicates the percentage of cpu for a core. Assuming the a core value to be 100, a --- End diff -- This is a bit confusing I think you mean the percentage of a cpu core a component will take. All of the configs should probably mention that this is the default value for components that don't override the value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40729915 --- Diff: storm-core/src/jvm/backtype/storm/Config.java --- @@ -167,6 +167,16 @@ public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class; /** + * What Network Topography detection classes should we use. + * Given a list of supervisor hostnames (or IP addresses), this class would return a list of + * rack names that correspond to the supervisors. This information is stored in Cluster.java, and + * is used in the resource aware scheduler. + */ +public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = "storm.network.topography.plugin"; +public static final Object STORM_NETWORK_TOPOGRAPHY_PLUGIN_SCHEMA = String.class; + + --- End diff -- Extra line of white space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/746#discussion_r40729530 --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj --- @@ -308,15 +308,15 @@ (defn- all-supervisor-info ([storm-cluster-state] (all-supervisor-info storm-cluster-state nil)) ([storm-cluster-state callback] - (let [supervisor-ids (.supervisors storm-cluster-state callback)] - (into {} - (mapcat - (fn [id] -(if-let [info (.supervisor-info storm-cluster-state id)] - [[id info]] - )) - supervisor-ids)) - ))) +(let [supervisor-ids (.supervisors storm-cluster-state callback)] + (into {} +(mapcat + (fn [id] +(if-let [info (.supervisor-info storm-cluster-state id)] + [[id info]] + )) + supervisor-ids)) + ))) --- End diff -- If this is just white space changes I think we should probably revert them --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-144190888 Well first of all you need to upmerge at some point soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
Github user jerrypeng commented on the pull request: https://github.com/apache/storm/pull/746#issuecomment-144111799 @HeartSaVioR @revans2 Can I get a review for my PR. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling
GitHub user jerrypeng opened a pull request: https://github.com/apache/storm/pull/746 [STORM-893] Resource Aware Scheduling I have created a initial open source implementation of the Resource Aware Scheduler as described in the paper I published: web.engr.illinois.edu/~bpeng/files/r-storm.pdf The paper describes the general architecture, concepts, and algorithms used. I have written an example topology that demonstrates how to use the API I have written to specify resource requirements in your topology. Currently the user can only specify two types of resources: Memory and CPU. We plan on adding support for more resources in the future. Currently, there is no built in enforcement of resource usage in worker processes, however, we plan to add that functionality via CGroups. People that worked on the implementation at Yahoo with me: Bobby Evans (Yahoo & Storm PMC) Derek Dagit (Yahoo & Storm PMC) Kyle Nusbaum (Yahoo & Storm PMC) Liu Zhuo (Yahoo) Sanket Chintapalli (Yahoo) Reza Fravier (Yahoo & UIUC) You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerrypeng/storm opensource_ras Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/746.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #746 commit e014804a68b9c6887c64b9d03a17929863b80909 Author: Boyang Jerry Peng Date: 2015-09-17T20:50:44Z [STORM-893] - Resource Aware Scheduler implementation [STORM-894] - Basic Resource Aware Scheduling implementation. commit 0f3f237f158218cbe46ffc59670f300875eb7950 Author: Boyang Jerry Peng Date: 2015-09-17T20:02:44Z Added functionality for users to limit the amount of memory resources allocated to a worker (JVM) process when scheduling with resource aware scheduler. This allows users to potentially spread executors more evenly across workers. Also refactored some code commit 28ee867c5a27c220be562689e61f4bb959a1aa62 Author: Boyang Jerry Peng Date: 2015-09-17T20:56:23Z regenerated thrift commit 0256f6baff63e8c394ebfddbb68d38de5893b053 Author: Boyang Jerry Peng Date: 2015-09-18T18:23:46Z adding miscellaneous things --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---