[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2385


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-07 Thread govind-menon
Github user govind-menon commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149523415
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
 ---
@@ -18,93 +18,58 @@
 
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.TreeSet;
-
 import org.apache.storm.Config;
-import org.apache.storm.generated.ComponentType;
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Component;
-import org.apache.storm.scheduler.ExecutorDetails;
-import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.scheduler.resource.RAS_Node;
-import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.*;
 import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DefaultResourceAwareStrategy implements IStrategy {
-private static final Logger LOG = 
LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
-private Cluster cluster;
-private Map networkTopography;
-private RAS_Nodes nodes;
+import java.util.*;
 
-@VisibleForTesting
-void prepare(Cluster cluster) {
-this.cluster = cluster;
-nodes = new RAS_Nodes(cluster);
-networkTopography = cluster.getNetworkTopography();
-logClusterInfo();
-}
+public class DefaultResourceAwareStrategy extends 
BaseResourceAwareStrategy implements IStrategy {
--- End diff --

I think that should come in later, ideally there should be a release 
without GRAS on by default and people can try it out themselves.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-07 Thread govind-menon
Github user govind-menon commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149511408
  
--- Diff: storm-client/src/storm.thrift ---
@@ -496,6 +496,8 @@ struct WorkerResources {
 3: optional double cpu;
 4: optional double shared_mem_on_heap; //This is just for accounting 
mem_on_heap should be used for enforcement
 5: optional double shared_mem_off_heap; //This is just for accounting 
mem_off_heap should be used for enforcement
+6: optional map resources; // Generic resources Map
+7: optional map shared_resources; // Shared Generic 
resources Map
--- End diff --

We're definitely using it.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread govind-menon
Github user govind-menon commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149225052
  
--- Diff: 
examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java 
---
@@ -430,37 +427,39 @@ public static void main(String[] args) throws 
Exception {
 return topologyResources;
 }
 
-static void checkIntialization(Map topologyResources, 
String com,
-  Map 
topologyConf) {
-checkInitMem(topologyResources, com, topologyConf);
-checkInitCpu(topologyResources, com, topologyConf);
-}
+/**
+ * Checks if the topology's resource requirements are initialized.
+ * @param topologyResources map of resouces requirements
--- End diff --

@revans2  I agree I was just following of how checkInitialization was 
behaving before.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149143172
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -241,6 +241,12 @@
 public static final String TOPOLOGY_TASKS = "topology.tasks";
 
 /**
+ * A map of resources used by each component e.g {"cpu" : 200.0. 
"onheap.memory.mb": 256.0, "gpu" : 0.5 }
--- End diff --

Do we actually want users setting `cpu` as the name of one of the 
resources?  Because in the code we are looking for `cpu.pcore.percent` or 
`topology.component.cpu.pcore.percent` and I really think this is going to 
confuse our users.  Especially if we are also telling them to use this for the 
`SUPERVISOR_RESOURCES_MAP`.

Can we please make sure that the comments here match with what we would 
expect the user to actually set, and ideally have them match any naming 
convention that we would want them to use?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149161951
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -556,6 +605,9 @@ public boolean wouldFit(
 }
 
 double memoryAdded = afterTotal - currentTotal;
+double memoryAvailable = ObjectReader.getDouble(
+
resourcesAvailable.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 
null), 0.0);
--- End diff --

Same as above, just make it a get.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149164076
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 ---
@@ -129,41 +137,65 @@ public static String 
getJsonWithUpdatedResources(String jsonConf, Map 
topologyResources, String com,
-  Map 
topologyConf) {
-checkInitMem(topologyResources, com, topologyConf);
-checkInitCpu(topologyResources, com, topologyConf);
-}
+public static void checkInitialization(Map 
topologyResources, String componentId, Map topologyConf) {
+StringBuilder msgBuilder = new StringBuilder();
 
-private static void checkInitMem(Map 
topologyResources, String com,
-Map topologyConf) {
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB))
 {
-Double onHeap = ObjectReader.getDouble(
-
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
-if (onHeap != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
onHeap);
-debugMessage("ONHEAP", com, topologyConf);
-}
+Set resourceNameSet = new HashSet<>();
+
+resourceNameSet.add(
+Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT
+);
+resourceNameSet.add(
+Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB
+);
+resourceNameSet.add(
+Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB
+);
+
+Map topologyComponentResourcesMap =
+(Map) topologyConf.getOrDefault(
+Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, 
Collections.emptyMap());
+
+resourceNameSet.addAll(topologyResources.keySet());
+resourceNameSet.addAll(topologyComponentResourcesMap.keySet());
+
+for (String resourceName : resourceNameSet) {
+msgBuilder.append(checkInitResource(topologyResources, 
topologyConf, topologyComponentResourcesMap, resourceName));
 }
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB))
 {
-Double offHeap = ObjectReader.getDouble(
-
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
-if (offHeap != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
offHeap);
-debugMessage("OFFHEAP", com, topologyConf);
-}
+
+Map normalizedTopologyResources = 
normalizedResourceMap(topologyResources);
+topologyResources.clear();
+topologyResources.putAll(normalizedTopologyResources);
+
+if (msgBuilder.length() > 0) {
+String resourceDefaults = msgBuilder.toString();
+LOG.debug(
+"Unable to extract resource requirement for Component 
{} \n Resources : {}",
+componentId, resourceDefaults);
 }
 }
 
-private static void checkInitCpu(Map 
topologyResources, String com,
- Map topologyConf) {
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
-Double cpu = 
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
 null);
-if (cpu != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
-debugMessage("CPU", com, topologyConf);
+private static String checkInitResource(Map 
topologyResources, Map topologyConf,
+Map 
topologyComponentResourcesMap, String resourceName) {
+StringBuilder msgBuilder = new StringBuilder();
+String normalizedResourceName = 
resourceNameMapping.getOrDefault(resourceName, resourceName);
+if (!topologyResources.containsKey(normalizedResourceName)) {
+if (topologyConf.containsKey(resourceName)) {
+Double resourceValue = 
ObjectReader.getDouble(topologyConf.get(resourceName));
+if(resourceValue!=null) {
+topologyResources.put(normalizedResourceName, 
resourceValue);

[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149162901
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
@@ -289,17 +300,13 @@ public Double getTotalMemReqTask(ExecutorDetails 
exec) {
 
 /**
  * Gets the total memory resource list for a set of tasks that is part 
of a topology.
- * @return Map a map of the total memory 
requirement for all tasks in topology topoId.
+ * @param executors
+ * @return Map ,
--- End diff --

The type is not needed here, and I think it all works on a single line.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149162982
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
@@ -289,17 +300,13 @@ public Double getTotalMemReqTask(ExecutorDetails 
exec) {
 
 /**
  * Gets the total memory resource list for a set of tasks that is part 
of a topology.
- * @return Map a map of the total memory 
requirement for all tasks in topology topoId.
+ * @param executors
--- End diff --

nit Having a param with no explanation is not really helpful.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149161531
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,43 +553,42 @@ public boolean wouldFit(
 WorkerSlot ws,
 ExecutorDetails exec,
 TopologyDetails td,
-double maxHeap,
-double memoryAvailable,
-double cpuAvailable) {
-//NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
-//CPU is simplest because it does not have odd interactions.
-double cpuNeeded = td.getTotalCpuReqTask(exec);
-if (cpuNeeded > cpuAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
-td.getName(),
-exec,
-ws,
-cpuNeeded,
-cpuAvailable);
-}
-//Not enough CPU no need to try any more
-return false;
-}
+Map resourcesAvailable,
+double maxHeap) {
 
-//Lets see if we can make the Memory one fast too, at least in the 
failure case.
-//The totalMemReq is not really that accurate because it does not 
include shared memory, but if it does not fit we know
-// Even with shared it will not work
-double minMemNeeded = td.getTotalMemReqTask(exec);
-if (minMemNeeded > memoryAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough Mem 
{} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
+Map requestedResources = 
td.getTotalResources(exec);
+
+for (Entry resourceNeededEntry : requestedResources.entrySet()) {
+String resourceName = resourceNeededEntry.getKey().toString();
--- End diff --

Why are we calling toString on a String?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149162640
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
@@ -133,30 +136,31 @@ public StormTopology getTopology() {
 
 private void initResourceList() {
 this.resourceList = new HashMap<>();
-// Extract bolt memory info
+// Extract bolt resource info
 if (topology.get_bolts() != null) {
 for (Map.Entry bolt : 
topology.get_bolts().entrySet()) {
 //the json_conf is populated by TopologyBuilder (e.g. 
boltDeclarer.setMemoryLoad)
 Map topologyResources =
 
ResourceUtils.parseResources(bolt.getValue().get_common().get_json_conf());
-ResourceUtils.checkIntialization(topologyResources, 
bolt.getKey(), topologyConf);
+ResourceUtils.checkInitialization(topologyResources, 
bolt.getKey(), this.topologyConf);
 for (Map.Entry 
anExecutorToComponent :
 executorToComponent.entrySet()) {
-if 
(bolt.getKey().equals(anExecutorToComponent.getValue())) {
+if 
(bolt.getKey().equals(anExecutorToComponent.getValue())  && 
topologyResources.keySet().size() > 0) {
--- End diff --

nit: `!topologyResources.isEmpty()` is a bit cleaner.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149164720
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 ---
@@ -238,4 +277,57 @@ public static double sum(Collection list) {
 public static double avg(Collection list) {
 return sum(list) / list.size();
 }
+
+/**
+ * Normalizes a supervisor resource map or topology details map's keys 
to universal resource names.
+ * @param resourceMap resource map of either Supervisor or Topology
+ * @return the resource map with common resource names
+ */
+public static Map normalizedResourceMap(Map resourceMap) {
+Map result = new HashMap();
+
+result.putAll(resourceMap);
+for(Map.Entry entry: resourceMap.entrySet()) {
+if (resourceNameMapping.containsKey(entry.getKey())) {
+result.put(resourceNameMapping.get(entry.getKey()), 
ObjectReader.getDouble(entry.getValue(), 0.0));
+result.remove(entry.getKey());
+}
+}
+return result;
+}
+
+public static Map addResources(Map 
resourceMap1, Map resourceMap2) {
+Map result = new HashMap();
+
+result.putAll(resourceMap1);
+
+for(Map.Entry entry: resourceMap2.entrySet()) {
--- End diff --

nit space after the for.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149164027
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 ---
@@ -129,41 +137,65 @@ public static String 
getJsonWithUpdatedResources(String jsonConf, Map 
topologyResources, String com,
-  Map 
topologyConf) {
-checkInitMem(topologyResources, com, topologyConf);
-checkInitCpu(topologyResources, com, topologyConf);
-}
+public static void checkInitialization(Map 
topologyResources, String componentId, Map topologyConf) {
+StringBuilder msgBuilder = new StringBuilder();
 
-private static void checkInitMem(Map 
topologyResources, String com,
-Map topologyConf) {
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB))
 {
-Double onHeap = ObjectReader.getDouble(
-
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
-if (onHeap != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
onHeap);
-debugMessage("ONHEAP", com, topologyConf);
-}
+Set resourceNameSet = new HashSet<>();
+
+resourceNameSet.add(
+Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT
+);
+resourceNameSet.add(
+Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB
+);
+resourceNameSet.add(
+Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB
+);
+
+Map topologyComponentResourcesMap =
+(Map) topologyConf.getOrDefault(
+Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, 
Collections.emptyMap());
+
+resourceNameSet.addAll(topologyResources.keySet());
+resourceNameSet.addAll(topologyComponentResourcesMap.keySet());
+
+for (String resourceName : resourceNameSet) {
+msgBuilder.append(checkInitResource(topologyResources, 
topologyConf, topologyComponentResourcesMap, resourceName));
 }
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB))
 {
-Double offHeap = ObjectReader.getDouble(
-
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
-if (offHeap != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
offHeap);
-debugMessage("OFFHEAP", com, topologyConf);
-}
+
+Map normalizedTopologyResources = 
normalizedResourceMap(topologyResources);
+topologyResources.clear();
+topologyResources.putAll(normalizedTopologyResources);
+
+if (msgBuilder.length() > 0) {
+String resourceDefaults = msgBuilder.toString();
+LOG.debug(
+"Unable to extract resource requirement for Component 
{} \n Resources : {}",
+componentId, resourceDefaults);
 }
 }
 
-private static void checkInitCpu(Map 
topologyResources, String com,
- Map topologyConf) {
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
-Double cpu = 
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
 null);
-if (cpu != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
-debugMessage("CPU", com, topologyConf);
+private static String checkInitResource(Map 
topologyResources, Map topologyConf,
+Map 
topologyComponentResourcesMap, String resourceName) {
+StringBuilder msgBuilder = new StringBuilder();
+String normalizedResourceName = 
resourceNameMapping.getOrDefault(resourceName, resourceName);
+if (!topologyResources.containsKey(normalizedResourceName)) {
+if (topologyConf.containsKey(resourceName)) {
+Double resourceValue = 
ObjectReader.getDouble(topologyConf.get(resourceName));
+if(resourceValue!=null) {
--- End diff --

nit: space after the if.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149162183
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -993,6 +1045,31 @@ public WorkerResources getWorkerResources(WorkerSlot 
ws) {
 }
 
 @Override
+public Map getAllScheduledResourcesForNode(String 
nodeId) {
+Map totalScheduledResources = new HashMap<>();
+for (SchedulerAssignmentImpl assignment : assignments.values()) {
+for (Entry entry :
+assignment.getScheduledResources().entrySet()) {
+if (nodeId.equals(entry.getKey().getNodeId())) {
+WorkerResources resources = entry.getValue();
+for (Map.Entry resourceEntry : 
resources.get_resources().entrySet()) {
+Double currentResourceValue = 
totalScheduledResources.containsKey(resourceEntry.getKey()) ? 
totalScheduledResources.get(resourceEntry.getKey()) : 0.0;
+
totalScheduledResources.put(resourceEntry.getKey().toString(), 
currentResourceValue + ObjectReader.getDouble(resourceEntry.getValue()));
--- End diff --

Why calling toString on a String?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149162313
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -993,6 +1045,31 @@ public WorkerResources getWorkerResources(WorkerSlot 
ws) {
 }
 
 @Override
+public Map getAllScheduledResourcesForNode(String 
nodeId) {
+Map totalScheduledResources = new HashMap<>();
+for (SchedulerAssignmentImpl assignment : assignments.values()) {
+for (Entry entry :
+assignment.getScheduledResources().entrySet()) {
+if (nodeId.equals(entry.getKey().getNodeId())) {
+WorkerResources resources = entry.getValue();
+for (Map.Entry resourceEntry : 
resources.get_resources().entrySet()) {
+Double currentResourceValue = 
totalScheduledResources.containsKey(resourceEntry.getKey()) ? 
totalScheduledResources.get(resourceEntry.getKey()) : 0.0;
--- End diff --

This is actually the perfect place to call getOrDefault.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149163670
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 ---
@@ -57,7 +64,8 @@
 if (topology.get_spouts() != null) {
 for (Map.Entry spout : 
topology.get_spouts().entrySet()) {
 Map topologyResources = 
parseResources(spout.getValue().get_common().get_json_conf());
-checkIntialization(topologyResources, 
spout.getValue().toString(), topologyConf);
+checkInitialization(topologyResources, spout.getKey(), 
topologyConf);
+LOG.warn("Turned {} into {}", 
spout.getValue().get_common().get_json_conf(), topologyResources);
--- End diff --

Can we delete this and the one on line 54 too?  it looks like a debug 
statement that accidentally slipped into the code. Truth be told I think it is 
my debug statement that slipped into this...


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149164648
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 ---
@@ -238,4 +277,57 @@ public static double sum(Collection list) {
 public static double avg(Collection list) {
 return sum(list) / list.size();
 }
+
+/**
+ * Normalizes a supervisor resource map or topology details map's keys 
to universal resource names.
+ * @param resourceMap resource map of either Supervisor or Topology
+ * @return the resource map with common resource names
+ */
+public static Map normalizedResourceMap(Map resourceMap) {
+Map result = new HashMap();
+
+result.putAll(resourceMap);
+for(Map.Entry entry: resourceMap.entrySet()) {
--- End diff --

nit: space after the for.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149163756
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 ---
@@ -129,41 +137,65 @@ public static String 
getJsonWithUpdatedResources(String jsonConf, Map 
topologyResources, String com,
-  Map 
topologyConf) {
-checkInitMem(topologyResources, com, topologyConf);
-checkInitCpu(topologyResources, com, topologyConf);
-}
+public static void checkInitialization(Map 
topologyResources, String componentId, Map topologyConf) {
--- End diff --

Why are we losing the type hints on topologyConf?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149140827
  
--- Diff: 
examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java 
---
@@ -430,37 +427,39 @@ public static void main(String[] args) throws 
Exception {
 return topologyResources;
 }
 
-static void checkIntialization(Map topologyResources, 
String com,
-  Map 
topologyConf) {
-checkInitMem(topologyResources, com, topologyConf);
-checkInitCpu(topologyResources, com, topologyConf);
-}
+/**
+ * Checks if the topology's resource requirements are initialized.
+ * @param topologyResources map of resouces requirements
--- End diff --

nit: could we add a comment that topologyResources will be modified if not 
properly initialized?  Just so it is clear, because that is not a typical thing 
to do in a java method.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149160853
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 ---
@@ -73,11 +75,23 @@ private SupervisorInfo buildSupervisorInfo(Map conf, Supervisor
 
 private Map mkSupervisorCapacities(Map 
conf) {
 Map ret = new HashMap();
+// Put in legacy values
 Double mem = 
ObjectReader.getDouble(conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB), 4096.0);
 ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
 Double cpu = 
ObjectReader.getDouble(conf.get(Config.SUPERVISOR_CPU_CAPACITY), 400.0);
 ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
-return ret;
+
+
+// If configs are present in Generic map and legacy - the legacy 
values will be overwritten
+Map resourcesMap = (Map) 
conf.get(Config.SUPERVISOR_RESOURCES_MAP);
--- End diff --

This is not guaranteed to be a Double.  It is guaranteed to be a Number.  
So we need to convert it with something like.

```
if (resourcesMap != null) {
for (Map.Entry stringNumberEntry : 
resourcesMap.entrySet()) {
ret.put(stringNumberEntry.getKey(), 
stringNumberEntry.getValue().doubleValue());
}
}
```


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149159775
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/transactional/TransactionalTopologyBuilder.java
 ---
@@ -534,9 +559,33 @@ public BoltDeclarer addConfigurations(Map conf) {
 }
 
 @Override
+public Map getRASConfiguration() {
+for(Map conf : _component.componentConfs) {
+if 
(conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+return conf;
+}
+}
+return new HashMap<>();
+}
+
+@Override
 public BoltDeclarer addSharedMemory(SharedMemory request) {
 _component.sharedMemory.add(request);
 return this;
 }
+
+@SuppressWarnings("unchecked")
+@Override
+public BoltDeclarer addResource(String resourceName, Number 
resourceValue) {
+Map resourcesMap = (Map) 
getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
+
+if (resourcesMap == null) {
+resourcesMap = new HashMap<>();
+}
+resourcesMap.put(resourceName, resourceValue.doubleValue());
+
+
getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, 
resourcesMap);
--- End diff --

And here.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149159593
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
@@ -576,10 +577,30 @@ public T addConfigurations(Map conf) {
 throw new IllegalArgumentException("Cannot set 
serializations for a component using fluent API");
 }
 String currConf = _commons.get(_id).get_json_conf();
-
_commons.get(_id).set_json_conf(mergeIntoJson(Utils.parseJson(currConf), conf));
+
_commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
 return (T) this;
 }
-
+
+@SuppressWarnings("unchecked")
+@Override
+public T addResource(String resourceName, Number resourceValue) {
+Map resourcesMap = (Map) 
getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
+
+if (resourcesMap == null) {
+resourcesMap = new HashMap<>();
+}
+resourcesMap.put(resourceName, resourceValue.doubleValue());
+
+
getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, 
resourcesMap);
--- End diff --

Like with the others this is not going to work.  the map you are changing 
is never put back into the serialized json config.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149159736
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/transactional/TransactionalTopologyBuilder.java
 ---
@@ -228,10 +229,34 @@ public SpoutDeclarer addConfigurations(Map conf) {
 }
 
 @Override
+public Map getRASConfiguration() {
+for(Map conf : _spoutConfs) {
+if 
(conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+return conf;
+}
+}
+return new HashMap<>();
+}
+
+@Override
 public SpoutDeclarer addSharedMemory(SharedMemory request) {
 _spoutSharedMemory.add(request);
 return this;
-}
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public SpoutDeclarer addResource(String resourceName, Number 
resourceValue) {
+Map resourcesMap = (Map) 
getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
+
+if (resourcesMap == null) {
+resourcesMap = new HashMap<>();
+}
+resourcesMap.put(resourceName, resourceValue.doubleValue());
+
+
getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, 
resourcesMap);
--- End diff --

Here too I don't think this will work for the same reasons as elsewhere.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149144724
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java 
---
@@ -450,9 +456,33 @@ public BoltDeclarer addConfigurations(Map conf) {
 }
 
 @Override
+public Map getRASConfiguration() {
+for (Map conf : _component.componentConfs) {
+if 
(conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+return conf;
+}
+}
+return new HashMap<>();
+}
+
+@Override
 public BoltDeclarer addSharedMemory(SharedMemory request) {
 _component.sharedMemory.add(request);
 return this;
 }
+
+@SuppressWarnings("unchecked")
+@Override
+public BoltDeclarer addResource(String resourceName, Number 
resourceValue) {
+Map resourcesMap = (Map) 
getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
+
+if (resourcesMap == null) {
--- End diff --

The above code never returns a null.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149163179
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
@@ -479,6 +499,18 @@ private void addDefaultResforExec(ExecutorDetails 
exec) {
 
 adjustResourcesForExec(exec, defaultResourceList);
 
+Map topologyComponentResourcesMap = (
+Map) this.topologyConf.getOrDefault(
+Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new 
HashMap<>()
+);
+
+assert topologyComponentResourcesMap != null;
+
+//topologyComponentResourcesMap = 
normalizedResourceMap(topologyComponentResourcesMap);
--- End diff --

Not needed please delete.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149148257
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java 
---
@@ -450,9 +456,33 @@ public BoltDeclarer addConfigurations(Map conf) {
 }
 
 @Override
+public Map getRASConfiguration() {
+for (Map conf : _component.componentConfs) {
--- End diff --

OK so this is crazy over complicated for something that does not need all 
of this complexity.  I don't know why `_component.componentConfs` is a list of 
maps instead of just a map because they will all be merged together in the 
final component.  If you could refactor it to be just a single map then life is 
a lot simpler, if not then we should not be just returning a new empty hash 
map, we should be adding it to the `_component.componentConfs` or it is going 
to be lost.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149148502
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java 
---
@@ -450,9 +456,33 @@ public BoltDeclarer addConfigurations(Map conf) {
 }
 
 @Override
+public Map getRASConfiguration() {
+for (Map conf : _component.componentConfs) {
+if 
(conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+return conf;
+}
+}
+return new HashMap<>();
+}
+
+@Override
 public BoltDeclarer addSharedMemory(SharedMemory request) {
 _component.sharedMemory.add(request);
 return this;
 }
+
+@SuppressWarnings("unchecked")
+@Override
+public BoltDeclarer addResource(String resourceName, Number 
resourceValue) {
+Map resourcesMap = (Map) 
getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
+
+if (resourcesMap == null) {
+resourcesMap = new HashMap<>();
+}
+resourcesMap.put(resourceName, resourceValue.doubleValue());
+
+
getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, 
resourcesMap);
--- End diff --

This does not work, see my comment above inside `getRASConfiguration`


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149160149
  
--- Diff: storm-client/src/storm.thrift ---
@@ -496,6 +496,8 @@ struct WorkerResources {
 3: optional double cpu;
 4: optional double shared_mem_on_heap; //This is just for accounting 
mem_on_heap should be used for enforcement
 5: optional double shared_mem_off_heap; //This is just for accounting 
mem_off_heap should be used for enforcement
+6: optional map resources; // Generic resources Map
+7: optional map shared_resources; // Shared Generic 
resources Map
--- End diff --

Are we using this?  If not lets hold off on adding it in until we support 
it.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149161667
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,43 +553,42 @@ public boolean wouldFit(
 WorkerSlot ws,
 ExecutorDetails exec,
 TopologyDetails td,
-double maxHeap,
-double memoryAvailable,
-double cpuAvailable) {
-//NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
-//CPU is simplest because it does not have odd interactions.
-double cpuNeeded = td.getTotalCpuReqTask(exec);
-if (cpuNeeded > cpuAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
-td.getName(),
-exec,
-ws,
-cpuNeeded,
-cpuAvailable);
-}
-//Not enough CPU no need to try any more
-return false;
-}
+Map resourcesAvailable,
+double maxHeap) {
 
-//Lets see if we can make the Memory one fast too, at least in the 
failure case.
-//The totalMemReq is not really that accurate because it does not 
include shared memory, but if it does not fit we know
-// Even with shared it will not work
-double minMemNeeded = td.getTotalMemReqTask(exec);
-if (minMemNeeded > memoryAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough Mem 
{} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
+Map requestedResources = 
td.getTotalResources(exec);
--- End diff --

Again I don't think this is guaranteed to be a Double.  It needs to be a 
Number that we can then convert.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149161456
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,43 +553,42 @@ public boolean wouldFit(
 WorkerSlot ws,
 ExecutorDetails exec,
 TopologyDetails td,
-double maxHeap,
-double memoryAvailable,
-double cpuAvailable) {
-//NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
-//CPU is simplest because it does not have odd interactions.
-double cpuNeeded = td.getTotalCpuReqTask(exec);
-if (cpuNeeded > cpuAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
-td.getName(),
-exec,
-ws,
-cpuNeeded,
-cpuAvailable);
-}
-//Not enough CPU no need to try any more
-return false;
-}
+Map resourcesAvailable,
+double maxHeap) {
 
-//Lets see if we can make the Memory one fast too, at least in the 
failure case.
-//The totalMemReq is not really that accurate because it does not 
include shared memory, but if it does not fit we know
-// Even with shared it will not work
-double minMemNeeded = td.getTotalMemReqTask(exec);
-if (minMemNeeded > memoryAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough Mem 
{} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
+Map requestedResources = 
td.getTotalResources(exec);
+
+for (Entry resourceNeededEntry : requestedResources.entrySet()) {
+String resourceName = resourceNeededEntry.getKey().toString();
+if (resourceName == 
Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == 
Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
--- End diff --

These need to be a .equals instead of ==.  A lot of the time this might 
work, but there are cases with strings where it will not work.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149159863
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java
 ---
@@ -763,9 +787,33 @@ public BoltDeclarer addConfigurations(Map conf) {
 }
 
 @Override
+public Map getRASConfiguration() {
+for(Map conf : _component.componentConfs) {
+if 
(conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+return conf;
+}
+}
+return new HashMap<>();
+}
+
+@Override
 public BoltDeclarer addSharedMemory(SharedMemory request) {
 _component.sharedMemory.add(request);
 return this;
 }
+
+@SuppressWarnings("unchecked")
+@Override
+public BoltDeclarer addResource(String resourceName, Number 
resourceValue) {
+Map resourcesMap = (Map) 
getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
+
+if (resourcesMap == null) {
+resourcesMap = new HashMap<>();
+}
+resourcesMap.put(resourceName, resourceValue.doubleValue());
+
+
getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, 
resourcesMap);
--- End diff --

And here...


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-11-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r149161816
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,43 +553,42 @@ public boolean wouldFit(
 WorkerSlot ws,
 ExecutorDetails exec,
 TopologyDetails td,
-double maxHeap,
-double memoryAvailable,
-double cpuAvailable) {
-//NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
-//CPU is simplest because it does not have odd interactions.
-double cpuNeeded = td.getTotalCpuReqTask(exec);
-if (cpuNeeded > cpuAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
-td.getName(),
-exec,
-ws,
-cpuNeeded,
-cpuAvailable);
-}
-//Not enough CPU no need to try any more
-return false;
-}
+Map resourcesAvailable,
+double maxHeap) {
 
-//Lets see if we can make the Memory one fast too, at least in the 
failure case.
-//The totalMemReq is not really that accurate because it does not 
include shared memory, but if it does not fit we know
-// Even with shared it will not work
-double minMemNeeded = td.getTotalMemReqTask(exec);
-if (minMemNeeded > memoryAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough Mem 
{} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
+Map requestedResources = 
td.getTotalResources(exec);
+
+for (Entry resourceNeededEntry : requestedResources.entrySet()) {
+String resourceName = resourceNeededEntry.getKey().toString();
+if (resourceName == 
Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == 
Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
+continue;
+}
+Double resourceNeeded = 
ObjectReader.getDouble(resourceNeededEntry.getValue());
+Double resourceAvailable = ObjectReader.getDouble(
+resourcesAvailable.getOrDefault(resourceName, null), 
0.0);
--- End diff --

getOrDefault with the second argument a null is the same as calling get.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-31 Thread govind-menon
Github user govind-menon commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r148045890
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java ---
@@ -17,12 +17,10 @@
  */
 package org.apache.storm.scheduler;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Map;
+import java.util.*;
--- End diff --

@Ethanlm Thanks!


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-30 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147755614
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Component;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareStrategy extends 
BaseResourceAwareStrategy implements IStrategy {
+private static final Logger LOG = 
LoggerFactory.getLogger(GenericResourceAwareStrategy.class);
+
+@Override
+public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
+prepare(cluster);
+if (nodes.getNodes().size() <= 0) {
+LOG.warn("No available nodes to schedule tasks on!");
+return SchedulingResult.failure(
+SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No 
available nodes to schedule tasks on!");
+}
+Collection unassignedExecutors =
+new HashSet<>(this.cluster.getUnassignedExecutors(td));
+LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors);
+Collection scheduledTasks = new ArrayList<>();
+List spouts = this.getSpouts(td);
+
+if (spouts.size() == 0) {
+LOG.error("Cannot find a Spout!");
+return SchedulingResult.failure(
+SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a 
Spout!");
+}
+
+//order executors to be scheduled
+List orderedExecutors = this.orderExecutors(td, 
unassignedExecutors);
+LOG.info("orderedExecutors");
+LOG.info(orderedExecutors.  toString());
+Collection executorsNotScheduled = new 
HashSet<>(unassignedExecutors);
+List favoredNodes = (List) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
+List unFavoredNodes = (List) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
+
+for (ExecutorDetails exec : orderedExecutors) {
+LOG.debug(
+"Attempting to schedule: {} of component {}[ REQ {} ]",
+exec,
+td.getExecutorToComponent().get(exec),
+td.getTaskResourceReqList(exec));
+final List sortedNodes = 
this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes);
+LOG.info("sortedNodes");
+LOG.info(sortedNodes.toString());
+
+scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
+}
+
+executorsNotScheduled.removeAll(scheduledTasks);
+LOG.error("/* Scheduling left over task (most likely sys tasks) 
*/");
+// schedule left over system tasks
+for (ExecutorDetails exec : executorsNotScheduled) {
+final List sortedNodes = 
this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes);
+LOG.info("sortedNodes");
+LOG.info(sortedNodes.toString());
--- End diff --

same here


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-30 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147737087
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java ---
@@ -17,12 +17,10 @@
  */
 package org.apache.storm.scheduler;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Map;
+import java.util.*;
--- End diff --

I believe it's automatically done by intellj. You can change the 
preferences: Preference--> Editor-->Code Syte --> Java --> and then change 
"class count to use import with '*'" to a very large number, e.g. 999. Then you 
should never have this problem.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-30 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147755026
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Component;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareStrategy extends 
BaseResourceAwareStrategy implements IStrategy {
+private static final Logger LOG = 
LoggerFactory.getLogger(GenericResourceAwareStrategy.class);
+
+@Override
+public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
+prepare(cluster);
+if (nodes.getNodes().size() <= 0) {
+LOG.warn("No available nodes to schedule tasks on!");
+return SchedulingResult.failure(
+SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No 
available nodes to schedule tasks on!");
+}
+Collection unassignedExecutors =
+new HashSet<>(this.cluster.getUnassignedExecutors(td));
+LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors);
+Collection scheduledTasks = new ArrayList<>();
+List spouts = this.getSpouts(td);
+
+if (spouts.size() == 0) {
+LOG.error("Cannot find a Spout!");
+return SchedulingResult.failure(
+SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a 
Spout!");
+}
+
+//order executors to be scheduled
+List orderedExecutors = this.orderExecutors(td, 
unassignedExecutors);
+LOG.info("orderedExecutors");
+LOG.info(orderedExecutors.  toString());
--- End diff --

Is it better to put them in one LOG.info()?   (I am not sure if LOG.debug() 
is better here)


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-30 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147755367
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
 ---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Component;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericResourceAwareStrategy extends 
BaseResourceAwareStrategy implements IStrategy {
+private static final Logger LOG = 
LoggerFactory.getLogger(GenericResourceAwareStrategy.class);
+
+@Override
+public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
+prepare(cluster);
+if (nodes.getNodes().size() <= 0) {
+LOG.warn("No available nodes to schedule tasks on!");
+return SchedulingResult.failure(
+SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No 
available nodes to schedule tasks on!");
+}
+Collection unassignedExecutors =
+new HashSet<>(this.cluster.getUnassignedExecutors(td));
+LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors);
+Collection scheduledTasks = new ArrayList<>();
+List spouts = this.getSpouts(td);
+
+if (spouts.size() == 0) {
+LOG.error("Cannot find a Spout!");
+return SchedulingResult.failure(
+SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a 
Spout!");
+}
+
+//order executors to be scheduled
+List orderedExecutors = this.orderExecutors(td, 
unassignedExecutors);
+LOG.info("orderedExecutors");
+LOG.info(orderedExecutors.  toString());
+Collection executorsNotScheduled = new 
HashSet<>(unassignedExecutors);
+List favoredNodes = (List) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
+List unFavoredNodes = (List) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
+
+for (ExecutorDetails exec : orderedExecutors) {
+LOG.debug(
+"Attempting to schedule: {} of component {}[ REQ {} ]",
+exec,
+td.getExecutorToComponent().get(exec),
+td.getTaskResourceReqList(exec));
+final List sortedNodes = 
this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes);
+LOG.info("sortedNodes");
+LOG.info(sortedNodes.toString());
+
--- End diff --

same here


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-27 Thread govind-menon
GitHub user govind-menon reopened a pull request:

https://github.com/apache/storm/pull/2385

STORM-2727: Generic Resource Aware Scheduling

Remaining

1. Add Tests
2. Do more manual testing

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/govind-menon/storm YSTORM-2727

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2385.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2385


commit facf515b121c80eed6c02d74104e323cfb9e4a1d
Author: Govind Menon 
Date:   2017-09-07T18:50:05Z

YSTORM-2725: Generic Resource Scheduling - initial config changes and 
TopologyBuilder API

commit 4337d44b4f95da34f2b1d0a4160353ab81c0fab0
Author: Govind Menon 
Date:   2017-10-23T15:31:08Z

YSTORM-2727: Generic Resource Aware Scheduling(GRAS) - metadata, scheduling 
strategies and configuration enhancements

commit 8e867fcf9183e8476658bff7119d9de8545e9aed
Author: Govind Menon 
Date:   2017-10-23T15:31:08Z

YSTORM-2727: Generic Resource Aware Scheduling(GRAS) - metadata, scheduling 
strategies and configuration enhancements




---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-27 Thread govind-menon
Github user govind-menon closed the pull request at:

https://github.com/apache/storm/pull/2385


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147236780
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -664,6 +722,9 @@ private double calculateSharedOffHeapMemory(String 
nodeId, SchedulerAssignmentIm
 
 private double calculateSharedOffHeapMemory(
 String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails 
extra) {
+if (assignment == null) {
--- End diff --

When is the assignment null???  Why would we try to calculate the shared 
off heap memory for no assignment???


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147237164
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java ---
@@ -168,6 +172,15 @@ private void initResourceList() {
 // topology.getbolt (AKA sys tasks most specifically __acker tasks)
 for (ExecutorDetails exec : getExecutors()) {
 if (!resourceList.containsKey(exec)) {
+// TODO: Update this debug statement
--- End diff --

Either update it or remove the TODO


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147234270
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -240,6 +240,12 @@
 public static final String TOPOLOGY_TASKS = "topology.tasks";
 
 /**
+ * A map of resources used by each component e.g {"cpu" : 200.0. 
"onheap.memory.mb": 256.0, "gpu" : 0.5 }
+ */
+@isMapEntryType(keyType = String.class, valueType = Double.class)
--- End diff --

It is really hard to enforce a double in the conf.  This is because both 
json and yaml don't know about this and put the parsed number in the smallest 
type that matches it.  i.e 1 becomes an integer, not a double.  1.0 might be a 
double, but it might also be a float, I dont' know.  If the number is really 
big it could be a long.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147234853
  
--- Diff: storm-client/src/jvm/org/apache/storm/Constants.java ---
@@ -56,5 +58,20 @@
 public static final String STORM_ACTIVE_ATOM = "storm-active-atom";
 public static final String COMPONENT_TO_DEBUG_ATOM = 
"storm-component->debug-atom";
 public static final Object LOAD_MAPPING = "load-mapping";
+
+public static final String COMMON_CPU_RESOURCE_NAME = 
"cpu.pcore.percent";
+public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = 
"onheap.memory.mb";
+public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = 
"offheap.memory.mb";
+public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = 
"memory.mb";
--- End diff --

Can we document these in some place? For example the configs show some 
examples, but they are not exhaustive, and the rebalance command is really bad 
at this so a user has no idea what to put in as the key.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147235417
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java ---
@@ -17,12 +17,10 @@
  */
 package org.apache.storm.scheduler;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Map;
+import java.util.*;
--- End diff --

I think .* is against the coding standard.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147233694
  
--- Diff: 
examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java 
---
@@ -430,37 +427,37 @@ public static void main(String[] args) throws 
Exception {
 return topologyResources;
 }
 
-static void checkIntialization(Map topologyResources, 
String com,
-  Map 
topologyConf) {
-checkInitMem(topologyResources, com, topologyConf);
-checkInitCpu(topologyResources, com, topologyConf);
-}
+/**
+ * Checks if the topology's resource requirements are initialized.
+ * @param topologyResources map of resouces requirements
+ * @param componentId component for which initialization is being 
conducted
+ * @param topologyConf topology configuration
+ * @throws Exception on any error
+ */
+public static void checkInitialization(Map 
topologyResources, String componentId, Map topologyConf) {
+StringBuilder msgBuilder = new StringBuilder();
 
-static void checkInitMem(Map topologyResources, String 
com,
- Map topologyConf) {
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB))
 {
-Double onHeap = ObjectReader.getDouble(
-
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
-if (onHeap != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 
onHeap);
-}
+for (String resourceName : topologyResources.keySet()) {
+msgBuilder.append(checkInitResource(topologyResources, 
topologyConf, resourceName));
 }
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB))
 {
-Double offHeap = ObjectReader.getDouble(
-
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
-if (offHeap != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 
offHeap);
-}
+
+if (msgBuilder.length() > 0) {
+String resourceDefaults = msgBuilder.toString();
+LOG.debug(
+"Unable to extract resource requirement for Component 
{} \n Resources : {}",
+componentId, resourceDefaults);
 }
 }
 
-static void checkInitCpu(Map topologyResources, String 
com,
- Map topologyConf) {
-if 
(!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
-Double cpu = 
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
 null);
-if (cpu != null) {
-
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu);
-}
+private static String checkInitResource(Map 
topologyResources, Map topologyConf, String resourceName) {
+StringBuilder msgBuilder = new StringBuilder();
+if (topologyResources.containsKey(resourceName)) {
+Double resourceValue = (Double) 
topologyConf.getOrDefault(resourceName, null);
--- End diff --

I think you missed something in the translation.  I think it should be more 
like

```
if (topologyResources.containsKey(resourceName)) {
Double resourceValue = 
ObjectReader.getDouble(topologyConf.get(resourceName), null);
if (resourceValue != null) {
topologyResources.put(resourceName, resourceValue);
}
...
```


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147235680
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java 
---
@@ -79,8 +81,18 @@ public T setMemoryLoad(Number onHeap, Number offHeap) {
 @Override
 public T setCPULoad(Number amount) {
 if(amount != null) {
+addResource(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 
amount);
 return 
addConfiguration(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount);
 }
 return (T) this;
 }
+
+@SuppressWarnings("unchecked")
+@Override
+public T addResources(Map resources) {
+if(resources != null) {
--- End diff --

nit: you need a space after the if.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147237442
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 ---
@@ -189,13 +201,20 @@ private static void checkInitCpu(Map 
topologyResources, String c
 null);
 
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
 }
-LOG.debug("Topology Resources {}", topologyResources);
+
+// If resource is also present in resources map will 
overwrite the above
+if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+topologyResources.putAll((Map) 
jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP));
+}
+LOG.info("Topology Resources {}", topologyResources);
 }
 } catch (ParseException e) {
 LOG.error("Failed to parse component resources is:" + 
e.toString(), e);
 return null;
 }
-return topologyResources;
+LOG.info("Topology Resources {}", 
normalizedResourceMap(topologyResources));
+LOG.info("Topology Resources {}", 
normalizedResourceMap(topologyResources));
--- End diff --

Why do we need to log this twice?  or at all?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147236218
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,62 +553,70 @@ public boolean wouldFit(
 WorkerSlot ws,
 ExecutorDetails exec,
 TopologyDetails td,
-double maxHeap,
-double memoryAvailable,
-double cpuAvailable) {
-//NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
-//CPU is simplest because it does not have odd interactions.
-double cpuNeeded = td.getTotalCpuReqTask(exec);
-if (cpuNeeded > cpuAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
-td.getName(),
-exec,
-ws,
-cpuNeeded,
-cpuAvailable);
+Map resourcesAvailable,
+double maxHeap) {
+
+Map requestedResources = 
td.getTotalResources(exec);
+
+LOG.info(td.getName());
+LOG.info("requested");
+LOG.info(requestedResources.toString());
+LOG.info("available");
+LOG.info(resourcesAvailable.toString());
+LOG.info(ws.toString());
--- End diff --

I think this is debug logging.  Can we please remove it?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147236953
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -993,6 +1054,31 @@ public WorkerResources getWorkerResources(WorkerSlot 
ws) {
 }
 
 @Override
+public Map getAllScheduledResourcesForNode(String 
nodeId) {
+Map totalScheduledResources = new HashMap<>();
+for (SchedulerAssignmentImpl assignment : assignments.values()) {
+for (Entry entry :
+assignment.getScheduledResources().entrySet()) {
+if (nodeId.equals(entry.getKey().getNodeId())) {
+WorkerResources resources = entry.getValue();
+for (Map.Entry resourceEntry : 
resources.get_resources().entrySet()) {
--- End diff --

Could you please add in the types for the Map.Entry?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147237298
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 ---
@@ -18,22 +18,24 @@
 
 package org.apache.storm.scheduler.resource;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
--- End diff --

again lets avoid .* imports


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147235287
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/generated/WorkerResources.java ---
@@ -1,21 +1,4 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
  * Autogenerated by Thrift Compiler (0.9.3)
--- End diff --

What happened to the license header?  Please run the shell script to 
generate the thrift code instead of doing it on your own.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147236417
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,62 +553,70 @@ public boolean wouldFit(
 WorkerSlot ws,
 ExecutorDetails exec,
 TopologyDetails td,
-double maxHeap,
-double memoryAvailable,
-double cpuAvailable) {
-//NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
-//CPU is simplest because it does not have odd interactions.
-double cpuNeeded = td.getTotalCpuReqTask(exec);
-if (cpuNeeded > cpuAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
-td.getName(),
-exec,
-ws,
-cpuNeeded,
-cpuAvailable);
+Map resourcesAvailable,
+double maxHeap) {
+
+Map requestedResources = 
td.getTotalResources(exec);
+
+LOG.info(td.getName());
+LOG.info("requested");
+LOG.info(requestedResources.toString());
+LOG.info("available");
+LOG.info(resourcesAvailable.toString());
+LOG.info(ws.toString());
+for (Entry resourceNeededEntry : requestedResources.entrySet()) {
+String resourceName = resourceNeededEntry.getKey().toString();
+if (resourceName == 
Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == 
Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
+continue;
 }
-//Not enough CPU no need to try any more
-return false;
-}
-
-//Lets see if we can make the Memory one fast too, at least in the 
failure case.
-//The totalMemReq is not really that accurate because it does not 
include shared memory, but if it does not fit we know
-// Even with shared it will not work
-double minMemNeeded = td.getTotalMemReqTask(exec);
-if (minMemNeeded > memoryAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough Mem 
{} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
+Double resourceNeeded = 
ObjectReader.getDouble(resourceNeededEntry.getValue());
+Double resourceAvailable = ObjectReader.getDouble(
+resourcesAvailable.getOrDefault(resourceName, null), 
0.0);
+if (resourceNeeded > resourceAvailable) {
+if (true) {
--- End diff --

I think this is a debug logging change.  Do you want to change it back?


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147234354
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -1272,6 +1278,12 @@
 public static final String SUPERVISOR_CPU_CAPACITY = 
"supervisor.cpu.capacity";
 
 /**
+ * A map of resources the Supervisor has e.g {"cpu" : 200.0. 
"memory.capacity.mb": 256.0, "gpu" : 0.5 }
+ */
+@isMapEntryType(keyType = String.class, valueType = Double.class)
--- End diff --

Same comment here.  Trying to force the type to be a double is going to be 
difficult with how we use the configs.


---


[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling

2017-10-26 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2385#discussion_r147236577
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,62 +553,70 @@ public boolean wouldFit(
 WorkerSlot ws,
 ExecutorDetails exec,
 TopologyDetails td,
-double maxHeap,
-double memoryAvailable,
-double cpuAvailable) {
-//NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
-//CPU is simplest because it does not have odd interactions.
-double cpuNeeded = td.getTotalCpuReqTask(exec);
-if (cpuNeeded > cpuAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
-td.getName(),
-exec,
-ws,
-cpuNeeded,
-cpuAvailable);
+Map resourcesAvailable,
+double maxHeap) {
+
+Map requestedResources = 
td.getTotalResources(exec);
+
+LOG.info(td.getName());
+LOG.info("requested");
+LOG.info(requestedResources.toString());
+LOG.info("available");
+LOG.info(resourcesAvailable.toString());
+LOG.info(ws.toString());
+for (Entry resourceNeededEntry : requestedResources.entrySet()) {
+String resourceName = resourceNeededEntry.getKey().toString();
+if (resourceName == 
Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == 
Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
+continue;
 }
-//Not enough CPU no need to try any more
-return false;
-}
-
-//Lets see if we can make the Memory one fast too, at least in the 
failure case.
-//The totalMemReq is not really that accurate because it does not 
include shared memory, but if it does not fit we know
-// Even with shared it will not work
-double minMemNeeded = td.getTotalMemReqTask(exec);
-if (minMemNeeded > memoryAvailable) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Could not schedule {}:{} on {} not enough Mem 
{} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
+Double resourceNeeded = 
ObjectReader.getDouble(resourceNeededEntry.getValue());
+Double resourceAvailable = ObjectReader.getDouble(
+resourcesAvailable.getOrDefault(resourceName, null), 
0.0);
+if (resourceNeeded > resourceAvailable) {
+if (true) {
+LOG.info("Could not schedule {}:{} on {} not enough {} 
{} > {}",
+td.getName(),
+exec,
+ws,
+resourceName,
+resourceNeeded,
+resourceAvailable);
+}
+//Not enough resources - stop trying
+return false;
 }
-//Not enough minimum MEM no need to try any more
-return false;
 }
 
 double currentTotal = 0.0;
 double afterTotal = 0.0;
 double afterOnHeap = 0.0;
+
 Set wouldBeAssigned = new HashSet<>();
 wouldBeAssigned.add(exec);
 SchedulerAssignmentImpl assignment = assignments.get(td.getId());
+
 if (assignment != null) {
 Collection currentlyAssigned = 
assignment.getSlotToExecutors().get(ws);
 if (currentlyAssigned != null) {
 wouldBeAssigned.addAll(currentlyAssigned);
 WorkerResources wrCurrent = calculateWorkerResources(td, 
currentlyAssigned);
 currentTotal = wrCurrent.get_mem_off_heap() + 
wrCurrent.get_mem_on_heap();
 }
-WorkerResources wrAfter = calculateWorkerResources(td, 
wouldBeAssigned);
-afterTotal = wrAfter.get_mem_off_heap() + 
wrAfter.get_mem_on_heap();
-afterOnHeap = wrAfter.get_mem_on_heap();
-
-currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), 
assignment);
-afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), 
assignment, exec);
 }
+WorkerResources wrAfter = calculateWorkerResources(td, 
wouldBeAssigned);
+afterTotal = wrAfter.get_mem_off_heap() + 
wrAfter.get_mem_on_heap();
+afterOnHeap = wrAfter.get_mem_on_heap();
+
+currentTotal +=