[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-09 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-146896112
  
@revans2  thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-09 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-146895509
  
@jerrypeng I merged this into master.  Looking at the sub tasks for 
STORM-893, this looked more like STORM-894.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/746


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-09 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-146880546
  
@revans2 Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-09 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-146880374
  
@jerrypeng Technically with one +1 from a committer and 24 hours we can 
merge this in.  But this is a large change so I am glad to see more people 
looking at it.  I'll try to go though it again and hopefully merge it in 
shortly.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-09 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-146875059
  
I have two +1s now, can we merge?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-08 Thread zhuoliu
Github user zhuoliu commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-146692455
  
Nice work! +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-07 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-146266179
  
@erikdw thank you for your review.  I reformated some of the files based on 
your suggestions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-07 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41412262
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/Cluster.java ---
@@ -438,6 +451,35 @@ public SupervisorDetails getSupervisorById(String 
nodeId) {
 return this.supervisors;
 }
 
+/*
+* Note: Make sure the proper conf was passed into the Cluster 
constructor before calling this function
+* It tries to load the proper network topography detection plugin 
specified in the config.
+*/
+public Map> getNetworkTopography() {
+if (networkTopography == null) {
+networkTopography = new HashMap>();
+ArrayList supervisorHostNames = new 
ArrayList();
+for (SupervisorDetails s : supervisors.values()) {
+supervisorHostNames.add(s.getHost());
+}
+
+String clazz = (String) 
conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN);
+DNSToSwitchMapping topographyMapper = (DNSToSwitchMapping) 
Utils.newInstance(clazz);
+
+Map  resolvedSuperVisors = 
topographyMapper.resolve(supervisorHostNames);
+for(String hostName: resolvedSuperVisors.keySet()) {
--- End diff --

will fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-07 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41410090
  
--- Diff: 
storm-core/src/jvm/backtype/storm/networktopography/DNSToSwitchMapping.java ---
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.networktopography;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An interface that must be implemented to allow pluggable
+ * DNS-name/IP-address to RackID resolvers.
+ *
+ */
+public interface DNSToSwitchMapping {
+  public final static String DEFAULT_RACK = "/default-rack";
--- End diff --

will reformat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-07 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41409816
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java 
---
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.Collection;
+import java.util.Map;
+
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+/**
+ * An interface to for implementing different scheduling strategies for 
the resource aware scheduling
+ * In the future stategies will be pluggable
+ */
+public interface IStrategy {
+
+   public Map> 
schedule(TopologyDetails td);
--- End diff --

will reformate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-07 Thread erikdw
Github user erikdw commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41366711
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/Cluster.java ---
@@ -438,6 +451,35 @@ public SupervisorDetails getSupervisorById(String 
nodeId) {
 return this.supervisors;
 }
 
+/*
+* Note: Make sure the proper conf was passed into the Cluster 
constructor before calling this function
+* It tries to load the proper network topography detection plugin 
specified in the config.
+*/
+public Map> getNetworkTopography() {
+if (networkTopography == null) {
+networkTopography = new HashMap>();
+ArrayList supervisorHostNames = new 
ArrayList();
+for (SupervisorDetails s : supervisors.values()) {
+supervisorHostNames.add(s.getHost());
+}
+
+String clazz = (String) 
conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN);
+DNSToSwitchMapping topographyMapper = (DNSToSwitchMapping) 
Utils.newInstance(clazz);
+
+Map  resolvedSuperVisors = 
topographyMapper.resolve(supervisorHostNames);
+for(String hostName: resolvedSuperVisors.keySet()) {
--- End diff --

Please use consistent for/if/while styling.  i.e., most of your new code is 
`for(`, but some (see 9 lines above) is `for (`.   It *seems* like the storm 
project is more consistently using `for (`.
This comment also applies to the `){` at the end of various lines.  That 
should always be `) {` IMNSHO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-07 Thread erikdw
Github user erikdw commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41366483
  
--- Diff: 
storm-core/src/jvm/backtype/storm/networktopography/DNSToSwitchMapping.java ---
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.networktopography;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An interface that must be implemented to allow pluggable
+ * DNS-name/IP-address to RackID resolvers.
+ *
+ */
+public interface DNSToSwitchMapping {
+  public final static String DEFAULT_RACK = "/default-rack";
--- End diff --

please use consistent indentation.  The java code in the storm project 
seems to use 4-space indents.  This applies to this file as well as 
AbstractDNSToSwitchMapping.java.  Maybe others that I've missed with all the 
generated code making this a bit hard to read through.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-07 Thread erikdw
Github user erikdw commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41366160
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java 
---
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.Collection;
+import java.util.Map;
+
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+/**
+ * An interface to for implementing different scheduling strategies for 
the resource aware scheduling
+ * In the future stategies will be pluggable
+ */
+public interface IStrategy {
+
+   public Map> 
schedule(TopologyDetails td);
--- End diff --

this is either a tab (evil) or 8 spaces.  It seems most of this java code 
has 4 space indents.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-06 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-146066442
  
@zhuoliu Thanks for your review.  I think just push a commit that I think 
address all your comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-06 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41347393
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
 ---
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.Component;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public class ResourceAwareStrategy implements IStrategy {
+private Logger LOG = null;
+private Topologies _topologies;
+private Cluster _cluster;
+//Map key is the supervisor id and the value is the corresponding 
RAS_Node Object 
+private Map _availNodes;
+private RAS_Node refNode = null;
+/**
+ * supervisor id -> Node
+ */
+private Map _nodes;
+private Map> _clusterInfo;
+
+private final double CPU_WEIGHT = 1.0;
+private final double MEM_WEIGHT = 1.0;
+private final double NETWORK_WEIGHT = 1.0;
+
+public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
+_topologies = topologies;
+_cluster = cluster;
+_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
+_availNodes = this.getAvailNodes();
+this.LOG = LoggerFactory.getLogger(this.getClass());
+_clusterInfo = cluster.getNetworkTopography();
+LOG.debug(this.getClusterInfo());
+}
+
+//the returned TreeMap keeps the Components sorted
+private TreeMap> 
getPriorityToExecutorDetailsListMap(
+Queue ordered__Component_list, 
Collection unassignedExecutors) {
+TreeMap> retMap = new 
TreeMap>();
+Integer rank = 0;
+for (Component ras_comp : ordered__Component_list) {
+retMap.put(rank, new ArrayList());
+for(ExecutorDetails exec : ras_comp.execs) {
+if(unassignedExecutors.contains(exec)) {
+retMap.get(rank).add(exec);
+}
+}
+rank++;
+}
+return retMap;
+}
+
+public Map> 
schedule(TopologyDetails td) {
+if (_availNodes.size() <= 0) {
+LOG.warn("No available nodes to schedule tasks on!");
+return null;
+}
+Collection unassignedExecutors = 
_cluster.getUnassignedExecutors(td);
+Map> 
schedulerAssignmentMap = new HashMap>();
+LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+Collection scheduledTasks = new 
ArrayList();
+List spouts = this.getSpouts(_topologies, td);
+
+if (spouts.size() == 0) {
+LOG.error("Cannot find a Spout!");
+return null;
+}
+
+Queue ordered__Component_list = bfs(_topologies, td, 
spouts);
+
+Map> priorityToExecutorMap = 
getPriorityToExecutorDetailsListMap(ordered__Component_list, 
unassignedExecutors);
+Collection executorsNotScheduled = new 
HashSet(unassignedExecutors);
+Integer longestPriorityListSize = 
this.getLongestPriorityListSize(priorityToExecutorMap);
+//Pick the first executor with priority one, then the 1st exec 
with prior

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-06 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41347451
  
--- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
@@ -18,10 +18,7 @@
 package backtype.storm.utils;
 
 import backtype.storm.Config;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
--- End diff --

will expand


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-06 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41347349
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
 ---
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.Component;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public class ResourceAwareStrategy implements IStrategy {
+private Logger LOG = null;
+private Topologies _topologies;
+private Cluster _cluster;
+//Map key is the supervisor id and the value is the corresponding 
RAS_Node Object 
+private Map _availNodes;
+private RAS_Node refNode = null;
+/**
+ * supervisor id -> Node
+ */
+private Map _nodes;
+private Map> _clusterInfo;
+
+private final double CPU_WEIGHT = 1.0;
+private final double MEM_WEIGHT = 1.0;
+private final double NETWORK_WEIGHT = 1.0;
+
+public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
+_topologies = topologies;
+_cluster = cluster;
+_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
+_availNodes = this.getAvailNodes();
+this.LOG = LoggerFactory.getLogger(this.getClass());
+_clusterInfo = cluster.getNetworkTopography();
+LOG.debug(this.getClusterInfo());
+}
+
+//the returned TreeMap keeps the Components sorted
+private TreeMap> 
getPriorityToExecutorDetailsListMap(
+Queue ordered__Component_list, 
Collection unassignedExecutors) {
+TreeMap> retMap = new 
TreeMap>();
+Integer rank = 0;
+for (Component ras_comp : ordered__Component_list) {
+retMap.put(rank, new ArrayList());
+for(ExecutorDetails exec : ras_comp.execs) {
+if(unassignedExecutors.contains(exec)) {
+retMap.get(rank).add(exec);
+}
+}
+rank++;
+}
+return retMap;
+}
+
+public Map> 
schedule(TopologyDetails td) {
+if (_availNodes.size() <= 0) {
+LOG.warn("No available nodes to schedule tasks on!");
+return null;
+}
+Collection unassignedExecutors = 
_cluster.getUnassignedExecutors(td);
+Map> 
schedulerAssignmentMap = new HashMap>();
+LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+Collection scheduledTasks = new 
ArrayList();
+List spouts = this.getSpouts(_topologies, td);
+
+if (spouts.size() == 0) {
+LOG.error("Cannot find a Spout!");
+return null;
+}
+
+Queue ordered__Component_list = bfs(_topologies, td, 
spouts);
+
+Map> priorityToExecutorMap = 
getPriorityToExecutorDetailsListMap(ordered__Component_list, 
unassignedExecutors);
+Collection executorsNotScheduled = new 
HashSet(unassignedExecutors);
+Integer longestPriorityListSize = 
this.getLongestPriorityListSize(priorityToExecutorMap);
+//Pick the first executor with priority one, then the 1st exec 
with prior

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-06 Thread zhuoliu
Github user zhuoliu commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41285663
  
--- Diff: storm-core/src/jvm/backtype/storm/utils/Utils.java ---
@@ -18,10 +18,7 @@
 package backtype.storm.utils;
 
 import backtype.storm.Config;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
--- End diff --

May expand this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-06 Thread zhuoliu
Github user zhuoliu commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41283467
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
 ---
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.Component;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public class ResourceAwareStrategy implements IStrategy {
+private Logger LOG = null;
+private Topologies _topologies;
+private Cluster _cluster;
+//Map key is the supervisor id and the value is the corresponding 
RAS_Node Object 
+private Map _availNodes;
+private RAS_Node refNode = null;
+/**
+ * supervisor id -> Node
+ */
+private Map _nodes;
+private Map> _clusterInfo;
+
+private final double CPU_WEIGHT = 1.0;
+private final double MEM_WEIGHT = 1.0;
+private final double NETWORK_WEIGHT = 1.0;
+
+public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
+_topologies = topologies;
+_cluster = cluster;
+_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
+_availNodes = this.getAvailNodes();
+this.LOG = LoggerFactory.getLogger(this.getClass());
+_clusterInfo = cluster.getNetworkTopography();
+LOG.debug(this.getClusterInfo());
+}
+
+//the returned TreeMap keeps the Components sorted
+private TreeMap> 
getPriorityToExecutorDetailsListMap(
+Queue ordered__Component_list, 
Collection unassignedExecutors) {
+TreeMap> retMap = new 
TreeMap>();
+Integer rank = 0;
+for (Component ras_comp : ordered__Component_list) {
+retMap.put(rank, new ArrayList());
+for(ExecutorDetails exec : ras_comp.execs) {
+if(unassignedExecutors.contains(exec)) {
+retMap.get(rank).add(exec);
+}
+}
+rank++;
+}
+return retMap;
+}
+
+public Map> 
schedule(TopologyDetails td) {
+if (_availNodes.size() <= 0) {
+LOG.warn("No available nodes to schedule tasks on!");
+return null;
+}
+Collection unassignedExecutors = 
_cluster.getUnassignedExecutors(td);
+Map> 
schedulerAssignmentMap = new HashMap>();
+LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+Collection scheduledTasks = new 
ArrayList();
+List spouts = this.getSpouts(_topologies, td);
+
+if (spouts.size() == 0) {
+LOG.error("Cannot find a Spout!");
+return null;
+}
+
+Queue ordered__Component_list = bfs(_topologies, td, 
spouts);
+
+Map> priorityToExecutorMap = 
getPriorityToExecutorDetailsListMap(ordered__Component_list, 
unassignedExecutors);
+Collection executorsNotScheduled = new 
HashSet(unassignedExecutors);
+Integer longestPriorityListSize = 
this.getLongestPriorityListSize(priorityToExecutorMap);
+//Pick the first executor with priority one, then the 1st exec 
with priorit

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-06 Thread zhuoliu
Github user zhuoliu commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41283265
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
 ---
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.Component;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public class ResourceAwareStrategy implements IStrategy {
+private Logger LOG = null;
+private Topologies _topologies;
+private Cluster _cluster;
+//Map key is the supervisor id and the value is the corresponding 
RAS_Node Object 
+private Map _availNodes;
+private RAS_Node refNode = null;
+/**
+ * supervisor id -> Node
+ */
+private Map _nodes;
+private Map> _clusterInfo;
+
+private final double CPU_WEIGHT = 1.0;
+private final double MEM_WEIGHT = 1.0;
+private final double NETWORK_WEIGHT = 1.0;
+
+public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
+_topologies = topologies;
+_cluster = cluster;
+_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
+_availNodes = this.getAvailNodes();
+this.LOG = LoggerFactory.getLogger(this.getClass());
+_clusterInfo = cluster.getNetworkTopography();
+LOG.debug(this.getClusterInfo());
+}
+
+//the returned TreeMap keeps the Components sorted
+private TreeMap> 
getPriorityToExecutorDetailsListMap(
+Queue ordered__Component_list, 
Collection unassignedExecutors) {
+TreeMap> retMap = new 
TreeMap>();
+Integer rank = 0;
+for (Component ras_comp : ordered__Component_list) {
+retMap.put(rank, new ArrayList());
+for(ExecutorDetails exec : ras_comp.execs) {
+if(unassignedExecutors.contains(exec)) {
+retMap.get(rank).add(exec);
+}
+}
+rank++;
+}
+return retMap;
+}
+
+public Map> 
schedule(TopologyDetails td) {
+if (_availNodes.size() <= 0) {
+LOG.warn("No available nodes to schedule tasks on!");
+return null;
+}
+Collection unassignedExecutors = 
_cluster.getUnassignedExecutors(td);
+Map> 
schedulerAssignmentMap = new HashMap>();
+LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+Collection scheduledTasks = new 
ArrayList();
+List spouts = this.getSpouts(_topologies, td);
+
+if (spouts.size() == 0) {
+LOG.error("Cannot find a Spout!");
+return null;
+}
+
+Queue ordered__Component_list = bfs(_topologies, td, 
spouts);
+
+Map> priorityToExecutorMap = 
getPriorityToExecutorDetailsListMap(ordered__Component_list, 
unassignedExecutors);
+Collection executorsNotScheduled = new 
HashSet(unassignedExecutors);
+Integer longestPriorityListSize = 
this.getLongestPriorityListSize(priorityToExecutorMap);
+//Pick the first executor with priority one, then the 1st exec 
with priorit

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-03 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-145261204
  
@revans2  Thanks for your review again!  I just push a commit that contains 
those revisions you suggested


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-03 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-145253628
  
@jerrypeng Just two more issues, both of which re relatively minor.  After 
them I am +1 for merging this in.  Then hopefully development work on RAS can 
move forward in open source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41087631
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
 ---
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.Component;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public class ResourceAwareStrategy implements IStrategy {
+private Logger LOG = null;
+private Topologies _topologies;
+private Cluster _cluster;
+//Map key is the supervisor id and the value is the corresponding 
RAS_Node Object 
+private Map _availNodes;
+private RAS_Node refNode = null;
+/**
+ * supervisor id -> Node
+ */
+private Map _nodes;
+private Map> _clusterInfo;
+
+private final double CPU_WEIGHT = 1.0;
+private final double MEM_WEIGHT = 1.0;
+private final double NETWORK_WEIGHT = 1.0;
+
+public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
+_topologies = topologies;
+_cluster = cluster;
+_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
+_availNodes = this.getAvailNodes();
+this.LOG = LoggerFactory.getLogger(this.getClass());
+_clusterInfo = cluster.getNetworkTopography();
+LOG.info(this.getClusterInfo());
+}
+
+//the returned TreeMap keeps the Components sorted
+private TreeMap> 
getPriorityToExecutorDetailsListMap(
+Queue ordered__Component_list, 
Collection unassignedExecutors) {
+TreeMap> retMap = new 
TreeMap>();
+Integer rank = 0;
+for (Component ras_comp : ordered__Component_list) {
+retMap.put(rank, new ArrayList());
+for(ExecutorDetails exec : ras_comp.execs) {
+if(unassignedExecutors.contains(exec)) {
+retMap.get(rank).add(exec);
+}
+}
+rank++;
+}
+return retMap;
+}
+
+public Map> 
schedule(TopologyDetails td) {
+if (_availNodes.size() <= 0) {
+LOG.warn("No available nodes to schedule tasks on!");
+return null;
+}
+Collection unassignedExecutors = 
_cluster.getUnassignedExecutors(td);
+Map> 
schedulerAssignmentMap = new HashMap>();
+LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+Collection scheduledTasks = new 
ArrayList();
+List spouts = this.getSpouts(_topologies, td);
+
+if (spouts.size() == 0) {
+LOG.error("Cannot find a Spout!");
+return null;
+}
+
+Queue ordered__Component_list = bfs(_topologies, td, 
spouts);
+
+Map> priorityToExecutorMap = 
getPriorityToExecutorDetailsListMap(ordered__Component_list, 
unassignedExecutors);
+Collection executorsNotScheduled = new 
HashSet(unassignedExecutors);
+Integer longestPriorityListSize = 
this.getLongestPriorityListSize(priorityToExecutorMap);
+//Pick the first executor with priority one, then the 1st exec 
with priority

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-03 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r41087614
  
--- Diff: 
storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java
 ---
@@ -0,0 +1,39 @@
+package backtype.storm.networktopography;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class implements the {@link DNSToSwitchMapping} interface
+ *It returns the DEFAULT_RACK for every host.
+ */
+public final class DefaultRackDNSToSwitchMapping extends 
CachedDNSToSwitchMapping {
--- End diff --

I now we borrowed this from Hadoop, and it looks like Hadoop has the same 
issue, but we cannot subclass CachedDNSToSwitchMapping to get caching.  It does 
not work that way.  Please either fix CacheDNSToSwitchMapping to actually do 
caching when it is a parent class, or remove it all together and have it's 
children inherent directly from AbstractDNSToSwitchMapping. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-02 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-145155422
  
@knusbaum 
@rfarivar 
@zhuoliu 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-02 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-145152436
  
Done making modifications based on comments.  Can I get some more reviews 
please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-10-02 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-145152237
  
@revans2 Thanks for the review!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40850849
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java 
---
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.Collection;
+import java.util.Map;
+
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public interface IStrategy {
--- End diff --

will add


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40850542
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java ---
@@ -0,0 +1,117 @@
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StormTopology;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by jerrypeng on 9/22/15.
+ */
+public class ResourceUtils {
--- End diff --

will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40850487
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java ---
@@ -0,0 +1,117 @@
+package backtype.storm.scheduler.resource;
--- End diff --

will add


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40850307
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import java.util.*;
+
+import backtype.storm.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.IScheduler;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy;
+
+public class ResourceAwareScheduler implements IScheduler {
+private static final Logger LOG = LoggerFactory
+.getLogger(ResourceAwareScheduler.class);
+@SuppressWarnings("rawtypes")
+private Map _conf;
+
+@Override
+public void prepare(Map conf) {
+_conf = conf;
+}
+
+@Override
+public void schedule(Topologies topologies, Cluster cluster) {
+LOG.info("\n\n\nRerunning ResourceAwareScheduler...");
--- End diff --

will remove/change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40849566
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import java.util.*;
--- End diff --

will change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40849297
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +95,302 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
ResourceUtils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
ResourceUtils.parseResources(spout
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.debug(
+"Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+this.getExecutorToComponent().get(exec),
+exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+this.addDefaultResforExec(exec);
+} 
+}
+}
+
+private List componentToExecs(String comp) {
+List execs = new ArrayList<>();
+for (Map.Entry entry : 
executorToComponent.entrySet()) {
+if (entry.getValue().equals(comp)) {
+execs.add(entry.getKey());
+}
+}
+return execs;
+}
+
+/**
+ * Returns a representation of the non-system components of the 
topology graph
+ * Each Component object in the returning map is populated with the 
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+public Map getComponents() {
+Map all_comp = new HashMap();
+
+StormTopology storm_topo = this.topology;
+// spouts
+if (storm_topo.get_spouts() != null) {
+for (Map.Entry spoutEntry : storm_topo
+.get_spouts().entrySet()) {
+if (!Utils.isSystemId(spoutEntry.getKey())) {
+Component newComp = null;
+if (all_comp.containsKey(spoutEntry.getKey())) {
+newComp = all_comp.get(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id);
+} else {
+newComp = new Component(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.i

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40849422
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +95,302 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
ResourceUtils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
ResourceUtils.parseResources(spout
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.debug(
+"Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+this.getExecutorToComponent().get(exec),
+exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+this.addDefaultResforExec(exec);
+} 
+}
+}
+
+private List componentToExecs(String comp) {
+List execs = new ArrayList<>();
+for (Map.Entry entry : 
executorToComponent.entrySet()) {
+if (entry.getValue().equals(comp)) {
+execs.add(entry.getKey());
+}
+}
+return execs;
+}
+
+/**
+ * Returns a representation of the non-system components of the 
topology graph
+ * Each Component object in the returning map is populated with the 
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+public Map getComponents() {
+Map all_comp = new HashMap();
+
+StormTopology storm_topo = this.topology;
+// spouts
+if (storm_topo.get_spouts() != null) {
+for (Map.Entry spoutEntry : storm_topo
+.get_spouts().entrySet()) {
+if (!Utils.isSystemId(spoutEntry.getKey())) {
+Component newComp = null;
+if (all_comp.containsKey(spoutEntry.getKey())) {
+newComp = all_comp.get(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id);
+} else {
+newComp = new Component(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.i

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40849301
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +95,302 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
ResourceUtils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
ResourceUtils.parseResources(spout
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.debug(
+"Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+this.getExecutorToComponent().get(exec),
+exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+this.addDefaultResforExec(exec);
+} 
+}
+}
+
+private List componentToExecs(String comp) {
+List execs = new ArrayList<>();
+for (Map.Entry entry : 
executorToComponent.entrySet()) {
+if (entry.getValue().equals(comp)) {
+execs.add(entry.getKey());
+}
+}
+return execs;
+}
+
+/**
+ * Returns a representation of the non-system components of the 
topology graph
+ * Each Component object in the returning map is populated with the 
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+public Map getComponents() {
+Map all_comp = new HashMap();
+
+StormTopology storm_topo = this.topology;
+// spouts
+if (storm_topo.get_spouts() != null) {
+for (Map.Entry spoutEntry : storm_topo
+.get_spouts().entrySet()) {
+if (!Utils.isSystemId(spoutEntry.getKey())) {
+Component newComp = null;
+if (all_comp.containsKey(spoutEntry.getKey())) {
+newComp = all_comp.get(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id);
+} else {
+newComp = new Component(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.i

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40849279
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +95,302 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
ResourceUtils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
ResourceUtils.parseResources(spout
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.debug(
+"Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+this.getExecutorToComponent().get(exec),
+exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+this.addDefaultResforExec(exec);
+} 
+}
+}
+
+private List componentToExecs(String comp) {
+List execs = new ArrayList<>();
+for (Map.Entry entry : 
executorToComponent.entrySet()) {
+if (entry.getValue().equals(comp)) {
+execs.add(entry.getKey());
+}
+}
+return execs;
+}
+
+/**
+ * Returns a representation of the non-system components of the 
topology graph
+ * Each Component object in the returning map is populated with the 
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+public Map getComponents() {
+Map all_comp = new HashMap();
+
+StormTopology storm_topo = this.topology;
+// spouts
+if (storm_topo.get_spouts() != null) {
+for (Map.Entry spoutEntry : storm_topo
+.get_spouts().entrySet()) {
+if (!Utils.isSystemId(spoutEntry.getKey())) {
+Component newComp = null;
+if (all_comp.containsKey(spoutEntry.getKey())) {
+newComp = all_comp.get(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id);
+} else {
+newComp = new Component(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.i

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40849130
  
--- Diff: 
storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java
 ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.networktopography;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This is a base class for DNS to Switch mappings.  It is not 
mandatory to
+ * derive {@link DNSToSwitchMapping} implementations from it, but it is 
strongly
+ * recommended, as it makes it easy for the developers to add new methods
+ * to this base class that are automatically picked up by all 
implementations.
+ * 
+ *
+ */
+public abstract class AbstractDNSToSwitchMapping
+implements DNSToSwitchMapping {
+
+//  private Configuration conf;
--- End diff --

will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40848979
  
--- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
@@ -442,4 +446,35 @@ public static String submitJar(Map conf, String 
localJar, ProgressListener liste
  */
 public void onCompleted(String srcFile, String targetFile, long 
totalBytes);
 }
+
+
--- End diff --

will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-144514012
  
@jerrypeng for the most part the code looks really good.  I would mostly 
just like to see a lot of the debug logging turned into LOG.debug instead of 
LOG.info statements.

For everyone else I think it is important to point out that resource aware 
scheduling is still an experimental feature.  The APIs should be fairly stable, 
but there is a lot more work to be done with it before it is production ready.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40838906
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java 
---
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies;
+
+import java.util.Collection;
+import java.util.Map;
+
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public interface IStrategy {
--- End diff --

Can we add a comment about what a Strategy is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40838824
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java ---
@@ -0,0 +1,117 @@
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StormTopology;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by jerrypeng on 9/22/15.
+ */
+public class ResourceUtils {
--- End diff --

git should take care of history instead of code comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40838769
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java ---
@@ -0,0 +1,117 @@
+package backtype.storm.scheduler.resource;
--- End diff --

This file needs the apache header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40838571
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import java.util.*;
+
+import backtype.storm.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.IScheduler;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy;
+
+public class ResourceAwareScheduler implements IScheduler {
+private static final Logger LOG = LoggerFactory
+.getLogger(ResourceAwareScheduler.class);
+@SuppressWarnings("rawtypes")
+private Map _conf;
+
+@Override
+public void prepare(Map conf) {
+_conf = conf;
+}
+
+@Override
+public void schedule(Topologies topologies, Cluster cluster) {
+LOG.info("\n\n\nRerunning ResourceAwareScheduler...");
--- End diff --

debug or remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40838543
  
--- Diff: 
storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import java.util.*;
--- End diff --

I don't really like .* imports they can result in errors if new classes are 
added to that package.  It is not a big deal, but would be nice to avoid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40838272
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +95,302 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
ResourceUtils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
ResourceUtils.parseResources(spout
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.debug(
+"Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+this.getExecutorToComponent().get(exec),
+exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+this.addDefaultResforExec(exec);
+} 
+}
+}
+
+private List componentToExecs(String comp) {
+List execs = new ArrayList<>();
+for (Map.Entry entry : 
executorToComponent.entrySet()) {
+if (entry.getValue().equals(comp)) {
+execs.add(entry.getKey());
+}
+}
+return execs;
+}
+
+/**
+ * Returns a representation of the non-system components of the 
topology graph
+ * Each Component object in the returning map is populated with the 
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+public Map getComponents() {
+Map all_comp = new HashMap();
+
+StormTopology storm_topo = this.topology;
+// spouts
+if (storm_topo.get_spouts() != null) {
+for (Map.Entry spoutEntry : storm_topo
+.get_spouts().entrySet()) {
+if (!Utils.isSystemId(spoutEntry.getKey())) {
+Component newComp = null;
+if (all_comp.containsKey(spoutEntry.getKey())) {
+newComp = all_comp.get(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id);
+} else {
+newComp = new Component(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id)

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40838180
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +95,302 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
ResourceUtils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
ResourceUtils.parseResources(spout
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.debug(
+"Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+this.getExecutorToComponent().get(exec),
+exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+this.addDefaultResforExec(exec);
+} 
+}
+}
+
+private List componentToExecs(String comp) {
+List execs = new ArrayList<>();
+for (Map.Entry entry : 
executorToComponent.entrySet()) {
+if (entry.getValue().equals(comp)) {
+execs.add(entry.getKey());
+}
+}
+return execs;
+}
+
+/**
+ * Returns a representation of the non-system components of the 
topology graph
+ * Each Component object in the returning map is populated with the 
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+public Map getComponents() {
+Map all_comp = new HashMap();
+
+StormTopology storm_topo = this.topology;
+// spouts
+if (storm_topo.get_spouts() != null) {
+for (Map.Entry spoutEntry : storm_topo
+.get_spouts().entrySet()) {
+if (!Utils.isSystemId(spoutEntry.getKey())) {
+Component newComp = null;
+if (all_comp.containsKey(spoutEntry.getKey())) {
+newComp = all_comp.get(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id);
+} else {
+newComp = new Component(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id)

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40838121
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +95,302 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
ResourceUtils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
ResourceUtils.parseResources(spout
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.debug(
+"Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+this.getExecutorToComponent().get(exec),
+exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+this.addDefaultResforExec(exec);
+} 
+}
+}
+
+private List componentToExecs(String comp) {
+List execs = new ArrayList<>();
+for (Map.Entry entry : 
executorToComponent.entrySet()) {
+if (entry.getValue().equals(comp)) {
+execs.add(entry.getKey());
+}
+}
+return execs;
+}
+
+/**
+ * Returns a representation of the non-system components of the 
topology graph
+ * Each Component object in the returning map is populated with the 
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+public Map getComponents() {
+Map all_comp = new HashMap();
+
+StormTopology storm_topo = this.topology;
+// spouts
+if (storm_topo.get_spouts() != null) {
+for (Map.Entry spoutEntry : storm_topo
+.get_spouts().entrySet()) {
+if (!Utils.isSystemId(spoutEntry.getKey())) {
+Component newComp = null;
+if (all_comp.containsKey(spoutEntry.getKey())) {
+newComp = all_comp.get(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id);
+} else {
+newComp = new Component(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id)

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40838080
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +95,302 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
ResourceUtils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
ResourceUtils.parseResources(spout
+.getValue().get_common().get_json_conf());
+ResourceUtils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.debug(
+"Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+this.getExecutorToComponent().get(exec),
+exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+this.addDefaultResforExec(exec);
+} 
+}
+}
+
+private List componentToExecs(String comp) {
+List execs = new ArrayList<>();
+for (Map.Entry entry : 
executorToComponent.entrySet()) {
+if (entry.getValue().equals(comp)) {
+execs.add(entry.getKey());
+}
+}
+return execs;
+}
+
+/**
+ * Returns a representation of the non-system components of the 
topology graph
+ * Each Component object in the returning map is populated with the 
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+public Map getComponents() {
+Map all_comp = new HashMap();
+
+StormTopology storm_topo = this.topology;
+// spouts
+if (storm_topo.get_spouts() != null) {
+for (Map.Entry spoutEntry : storm_topo
+.get_spouts().entrySet()) {
+if (!Utils.isSystemId(spoutEntry.getKey())) {
+Component newComp = null;
+if (all_comp.containsKey(spoutEntry.getKey())) {
+newComp = all_comp.get(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id);
+} else {
+newComp = new Component(spoutEntry.getKey());
+newComp.execs = componentToExecs(newComp.id)

[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40837655
  
--- Diff: 
storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java
 ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.networktopography;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This is a base class for DNS to Switch mappings.  It is not 
mandatory to
+ * derive {@link DNSToSwitchMapping} implementations from it, but it is 
strongly
+ * recommended, as it makes it easy for the developers to add new methods
+ * to this base class that are automatically picked up by all 
implementations.
+ * 
+ *
+ */
+public abstract class AbstractDNSToSwitchMapping
+implements DNSToSwitchMapping {
+
+//  private Configuration conf;
--- End diff --

Lets remove this if it is not used. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40837456
  
--- Diff: 
storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java ---
@@ -51,7 +51,7 @@
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = 
"2015-2-6")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = 
"2015-9-30")
--- End diff --

Can we revert the generated files that only have a date change in them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40837399
  
--- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
@@ -442,4 +446,35 @@ public static String submitJar(Map conf, String 
localJar, ProgressListener liste
  */
 public void onCompleted(String srcFile, String targetFile, long 
totalBytes);
 }
+
+
+private static void validateConfs(Map stormConf, StormTopology 
topology) throws IllegalArgumentException {
+double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, 
stormConf);
+Double topologyWorkerMaxHeapSize = 
Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
+if(topologyWorkerMaxHeapSize < largestMemReq) {
+throw new IllegalArgumentException("Topology will not be able 
to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
+
+Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < 
" 
++ largestMemReq + " (Largest memory 
requirement of a component in the topology). Perhaps set 
TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
+}
+}
+
+
--- End diff --

Extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40837341
  
--- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
@@ -442,4 +446,35 @@ public static String submitJar(Map conf, String 
localJar, ProgressListener liste
  */
 public void onCompleted(String srcFile, String targetFile, long 
totalBytes);
 }
+
+
--- End diff --

Extra space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-30 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-144451299
  
upmerged!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40735548
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/Topologies.java ---
@@ -21,10 +21,17 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import backtype.storm.scheduler.resource.RAS_Component;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class Topologies {
 Map topologies;
 Map nameToId;
-
+Map> _allRAS_Components;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Topologies.class);
--- End diff --

will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40735533
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +96,304 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any bolts!");
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(spout
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.info(
+"Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+this.getExecutorToComponent().get(exec),
+exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+this.addDefaultResforExec(exec);
+} 
+}
+}
+
+private List componentToExecs(String comp) {
+List execs = new ArrayList<>();
+for (Map.Entry entry : 
executorToComponent.entrySet()) {
+if (entry.getValue().equals(comp)) {
+execs.add(entry.getKey());
+}
+}
+return execs;
+}
+
+/**
+ * Returns a representation of the non-system components of the 
topology graph
+ * Each Component object in the returning map is populated with the 
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+public Map getRAS_Components() {
--- End diff --

will rename


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40734516
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +96,304 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any bolts!");
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(spout
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.info(
--- End diff --

will change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40734496
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +96,304 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any bolts!");
--- End diff --

will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40734280
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -31,41 +42,47 @@
 StormTopology topology;
 Map executorToComponent;
 int numWorkers;
- 
+//>>
+private Map> _resourceList;
+//Scheduler this topology should be scheduled with
+private String scheduler;
--- End diff --

will delete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40732961
  
--- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
@@ -442,4 +444,36 @@ public static String submitJar(Map conf, String 
localJar, ProgressListener liste
  */
 public void onCompleted(String srcFile, String targetFile, long 
totalBytes);
 }
+
+
+private static void validateConfs(Map stormConf, StormTopology 
topology) throws IllegalArgumentException {
+LOG.info("Validating storm Confs...");
+double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, 
stormConf);
+Double topologyWorkerMaxHeapSize = 
Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
+if(topologyWorkerMaxHeapSize < largestMemReq) {
+throw new IllegalArgumentException("Topology will not be able 
to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
+
+Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < 
" 
++ largestMemReq + " (Largest memory 
requirement of a component in the topology). Perhaps set 
TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
+}
+}
+
+
+private static double getMaxExecutorMemoryUsageForTopo(StormTopology 
topology, Map topologyConf) {
+double largestMemoryOperator = 0.0;
+for(Map entry : 
backtype.storm.scheduler.resource.Utils.getBoltsResources(topology, 
topologyConf).values()) {
--- End diff --

rename


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40732321
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +96,304 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any bolts!");
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(spout
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.info(
--- End diff --

Can we make this debug instead?  I don't think we need to tell everyone all 
the time about all of the topologies resources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40732622
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +96,304 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any bolts!");
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(spout
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
+}
+//schedule tasks that are not part of components returned from 
topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker 
tasks)
+for(ExecutorDetails exec : this.getExecutors()) {
+if (_resourceList.containsKey(exec) == false) {
+LOG.info(
+"Scheduling {} {} with memory requirement as 'on 
heap' - {} and 'off heap' - {} and CPU requirement as {}",
+this.getExecutorToComponent().get(exec),
+exec,
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+this.addDefaultResforExec(exec);
+} 
+}
+}
+
+private List componentToExecs(String comp) {
+List execs = new ArrayList<>();
+for (Map.Entry entry : 
executorToComponent.entrySet()) {
+if (entry.getValue().equals(comp)) {
+execs.add(entry.getKey());
+}
+}
+return execs;
+}
+
+/**
+ * Returns a representation of the non-system components of the 
topology graph
+ * Each Component object in the returning map is populated with the 
list of its
+ * parents, children and execs assigned to that component.
+ * @return a map of components
+ */
+public Map getRAS_Components() {
--- End diff --

I'm not sure that RAS_Component is the right name for this class or this 
method.  The class has nothing to do with resources at all.  Perhaps we can 
call it a Component and the method getComponentGraph instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40732462
  
--- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
@@ -442,4 +444,36 @@ public static String submitJar(Map conf, String 
localJar, ProgressListener liste
  */
 public void onCompleted(String srcFile, String targetFile, long 
totalBytes);
 }
+
+
+private static void validateConfs(Map stormConf, StormTopology 
topology) throws IllegalArgumentException {
+LOG.info("Validating storm Confs...");
--- End diff --

will delete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40732244
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +96,304 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any bolts!");
+}
+// Extract spout memory info
+if (this.topology.get_spouts() != null) {
+for (Map.Entry spout : 
this.topology.get_spouts().entrySet()) {
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(spout
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
spout.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(spout.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any spouts!");
--- End diff --

Ok so this is rather odd, this should probably be a warning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40731979
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -31,41 +42,47 @@
 StormTopology topology;
 Map executorToComponent;
 int numWorkers;
- 
+//>>
+private Map> _resourceList;
+//Scheduler this topology should be scheduled with
+private String scheduler;
--- End diff --

This does not appear to be used anywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40732194
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java 
---
@@ -79,11 +96,304 @@ public StormTopology getTopology() {
 ret.put(executor, compId);
 }
 }
-
+
 return ret;
 }
-
+
 public Collection getExecutors() {
 return this.executorToComponent.keySet();
 }
+
+private void initResourceList() {
+_resourceList = new HashMap>();
+// Extract bolt memory info
+if (this.topology.get_bolts() != null) {
+for (Map.Entry bolt : 
this.topology.get_bolts().entrySet()) {
+//the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
+Map topology_resources = 
backtype.storm.scheduler.resource.Utils.parseResources(bolt
+.getValue().get_common().get_json_conf());
+
backtype.storm.scheduler.resource.Utils.checkIntialization(topology_resources, 
bolt.getValue().toString(), this.topologyConf);
+for (Map.Entry 
anExecutorToComponent : executorToComponent.entrySet()) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+_resourceList.put(anExecutorToComponent.getKey(), 
topology_resources);
+}
+}
+}
+} else {
+LOG.warn("Topology " + topologyId + " does not seem to have 
any bolts!");
--- End diff --

Is this really a warning?  I would almost rather see this removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40731800
  
--- Diff: storm-core/src/jvm/backtype/storm/scheduler/Topologies.java ---
@@ -21,10 +21,17 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import backtype.storm.scheduler.resource.RAS_Component;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class Topologies {
 Map topologies;
 Map nameToId;
-
+Map> _allRAS_Components;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Topologies.class);
--- End diff --

This does not look like it is being used.  Not a big deal, but unless we 
use it we should probably remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40731631
  
--- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
@@ -1101,6 +1127,33 @@
 public static final Object TOPOLOGY_TASKS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
 /**
+ * The maximum amount of memory an instance of a spout/bolt will take 
on heap. This enables the scheduler
+ * to allocate slots on machines with enough available memory.
+ */
+public static final String 
TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB = 
"topology.component.resources.onheap.memory.mb";
+public static final Object 
TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB_SCHEMA = 
ConfigValidation.NonNegativeNumberValidator;
+
+/**
+ * The maximum amount of memory an instance of a spout/bolt will take 
off heap. This enables the scheduler
+ * to allocate slots on machines with enough available memory.
+ */
+public static final String 
TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB = 
"topology.component.resources.offheap.memory.mb";
+public static final Object 
TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB_SCHEMA = 
ConfigValidation.NonNegativeNumberValidator;
+
+/**
+ * The config indicates the percentage of cpu for a core. Assuming the 
a core value to be 100, a
--- End diff --

will fix comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40731147
  
--- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
@@ -187,7 +187,7 @@ public static void submitTopology(String name, Map 
stormConf, StormTopology topo
  * @throws AuthorizationException
  */
--- End diff --

add comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40730611
  
--- Diff: 
storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java ---
@@ -51,7 +51,7 @@
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = 
"2015-2-6")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = 
"2015-9-18")
--- End diff --

In general we try to not add in thrift changes that are just date changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40730558
  
--- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
@@ -167,6 +167,16 @@
 public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
 
 /**
+ * What Network Topography detection classes should we use.
+ * Given a list of supervisor hostnames (or IP addresses), this class 
would return a list of
+ * rack names that correspond to the supervisors. This information is 
stored in Cluster.java, and
+ * is used in the resource aware scheduler.
+ */
+public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = 
"storm.network.topography.plugin";
+public static final Object STORM_NETWORK_TOPOGRAPHY_PLUGIN_SCHEMA = 
String.class;
+
+
--- End diff --

fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40730552
  
--- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
@@ -442,4 +444,36 @@ public static String submitJar(Map conf, String 
localJar, ProgressListener liste
  */
 public void onCompleted(String srcFile, String targetFile, long 
totalBytes);
 }
+
+
+private static void validateConfs(Map stormConf, StormTopology 
topology) throws IllegalArgumentException {
+LOG.info("Validating storm Confs...");
+double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, 
stormConf);
+Double topologyWorkerMaxHeapSize = 
Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
+if(topologyWorkerMaxHeapSize < largestMemReq) {
+throw new IllegalArgumentException("Topology will not be able 
to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
+
+Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < 
" 
++ largestMemReq + " (Largest memory 
requirement of a component in the topology). Perhaps set 
TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
+}
+}
+
+
+private static double getMaxExecutorMemoryUsageForTopo(StormTopology 
topology, Map topologyConf) {
+double largestMemoryOperator = 0.0;
+for(Map entry : 
backtype.storm.scheduler.resource.Utils.getBoltsResources(topology, 
topologyConf).values()) {
--- End diff --

Can we rename backtype.storm.scheduler.resource.Utils to be something like 
ResourceUtils instead.  Just so there is no conflict with imports.  I think it 
will make the code cleaner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40730370
  
--- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
@@ -442,4 +444,36 @@ public static String submitJar(Map conf, String 
localJar, ProgressListener liste
  */
 public void onCompleted(String srcFile, String targetFile, long 
totalBytes);
 }
+
+
+private static void validateConfs(Map stormConf, StormTopology 
topology) throws IllegalArgumentException {
+LOG.info("Validating storm Confs...");
--- End diff --

I don't think we need this log message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40730329
  
--- Diff: storm-core/src/jvm/backtype/storm/StormSubmitter.java ---
@@ -187,7 +187,7 @@ public static void submitTopology(String name, Map 
stormConf, StormTopology topo
  * @throws AuthorizationException
  */
--- End diff --

If we now throw an IllegalArgumentException, even if it is a Runtime 
Exception we should document it here, and ideally describe the situation when 
it is thrown.  Otherwise why declare it at all?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40730129
  
--- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
@@ -1101,6 +1127,33 @@
 public static final Object TOPOLOGY_TASKS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
 /**
+ * The maximum amount of memory an instance of a spout/bolt will take 
on heap. This enables the scheduler
+ * to allocate slots on machines with enough available memory.
+ */
+public static final String 
TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB = 
"topology.component.resources.onheap.memory.mb";
+public static final Object 
TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB_SCHEMA = 
ConfigValidation.NonNegativeNumberValidator;
+
+/**
+ * The maximum amount of memory an instance of a spout/bolt will take 
off heap. This enables the scheduler
+ * to allocate slots on machines with enough available memory.
+ */
+public static final String 
TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB = 
"topology.component.resources.offheap.memory.mb";
+public static final Object 
TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB_SCHEMA = 
ConfigValidation.NonNegativeNumberValidator;
+
+/**
+ * The config indicates the percentage of cpu for a core. Assuming the 
a core value to be 100, a
--- End diff --

This is a bit confusing I think you mean the percentage of a cpu core a 
component will take.

All of the configs should probably mention that this is the default value 
for components that don't override the value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40729915
  
--- Diff: storm-core/src/jvm/backtype/storm/Config.java ---
@@ -167,6 +167,16 @@
 public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
 
 /**
+ * What Network Topography detection classes should we use.
+ * Given a list of supervisor hostnames (or IP addresses), this class 
would return a list of
+ * rack names that correspond to the supervisors. This information is 
stored in Cluster.java, and
+ * is used in the resource aware scheduler.
+ */
+public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = 
"storm.network.topography.plugin";
+public static final Object STORM_NETWORK_TOPOGRAPHY_PLUGIN_SCHEMA = 
String.class;
+
+
--- End diff --

Extra line of white space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/746#discussion_r40729530
  
--- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---
@@ -308,15 +308,15 @@
 (defn- all-supervisor-info
   ([storm-cluster-state] (all-supervisor-info storm-cluster-state nil))
   ([storm-cluster-state callback]
- (let [supervisor-ids (.supervisors storm-cluster-state callback)]
-   (into {}
- (mapcat
-  (fn [id]
-(if-let [info (.supervisor-info storm-cluster-state id)]
-  [[id info]]
-  ))
-  supervisor-ids))
-   )))
+(let [supervisor-ids (.supervisors storm-cluster-state callback)]
+  (into {}
+(mapcat
+  (fn [id]
+(if-let [info (.supervisor-info storm-cluster-state id)]
+  [[id info]]
+  ))
+  supervisor-ids))
+  )))
--- End diff --

If this is just white space changes I think we should probably revert them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread revans2
Github user revans2 commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-144190888
  
Well first of all you need to upmerge at some point soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-29 Thread jerrypeng
Github user jerrypeng commented on the pull request:

https://github.com/apache/storm/pull/746#issuecomment-144111799
  
@HeartSaVioR  
@revans2 
Can I get a review for my PR. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-893] Resource Aware Scheduling

2015-09-18 Thread jerrypeng
GitHub user jerrypeng opened a pull request:

https://github.com/apache/storm/pull/746

[STORM-893] Resource Aware Scheduling

I have created a initial open source implementation of the Resource Aware 
Scheduler as described in the paper I published: 

web.engr.illinois.edu/~bpeng/files/r-storm.pdf

The paper describes the general architecture, concepts, and algorithms used.

I have written an example topology that demonstrates how to use the API I 
have written to specify resource requirements in your topology.  Currently the 
user can only specify two types of resources: Memory and CPU.  We plan on 
adding support for more resources in the future.

Currently, there is no built in enforcement of resource usage in worker 
processes, however, we plan to add that functionality via CGroups.

People that worked on the implementation at Yahoo with me:

Bobby Evans (Yahoo & Storm PMC)
Derek Dagit (Yahoo & Storm PMC)
Kyle Nusbaum (Yahoo & Storm PMC)
Liu Zhuo (Yahoo)
Sanket Chintapalli (Yahoo)
Reza Fravier (Yahoo & UIUC)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerrypeng/storm opensource_ras

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #746


commit e014804a68b9c6887c64b9d03a17929863b80909
Author: Boyang Jerry Peng 
Date:   2015-09-17T20:50:44Z

[STORM-893] - Resource Aware Scheduler implementation
[STORM-894] - Basic Resource Aware Scheduling implementation.

commit 0f3f237f158218cbe46ffc59670f300875eb7950
Author: Boyang Jerry Peng 
Date:   2015-09-17T20:02:44Z

Added functionality for users to limit the amount of memory resources 
allocated to a worker (JVM) process when scheduling with resource aware 
scheduler. This allows users to potentially spread executors more evenly across 
workers.
Also refactored some code

commit 28ee867c5a27c220be562689e61f4bb959a1aa62
Author: Boyang Jerry Peng 
Date:   2015-09-17T20:56:23Z

regenerated thrift

commit 0256f6baff63e8c394ebfddbb68d38de5893b053
Author: Boyang Jerry Peng 
Date:   2015-09-18T18:23:46Z

adding miscellaneous things




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---