[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2385 ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user govind-menon commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149523415 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java --- @@ -18,93 +18,58 @@ package org.apache.storm.scheduler.resource.strategies.scheduling; -import com.google.common.annotations.VisibleForTesting; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.TreeSet; - import org.apache.storm.Config; -import org.apache.storm.generated.ComponentType; -import org.apache.storm.scheduler.Cluster; -import org.apache.storm.scheduler.Component; -import org.apache.storm.scheduler.ExecutorDetails; -import org.apache.storm.scheduler.TopologyDetails; -import org.apache.storm.scheduler.WorkerSlot; -import org.apache.storm.scheduler.resource.RAS_Node; -import org.apache.storm.scheduler.resource.RAS_Nodes; +import org.apache.storm.scheduler.*; import org.apache.storm.scheduler.resource.ResourceUtils; import org.apache.storm.scheduler.resource.SchedulingResult; import org.apache.storm.scheduler.resource.SchedulingStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DefaultResourceAwareStrategy implements IStrategy { -private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class); -private Cluster cluster; -private MapnetworkTopography; -private RAS_Nodes nodes; +import java.util.*; -@VisibleForTesting -void prepare(Cluster cluster) { -this.cluster = cluster; -nodes = new RAS_Nodes(cluster); -networkTopography = cluster.getNetworkTopography(); -logClusterInfo(); -} +public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy { --- End diff -- I think that should come in later, ideally there should be a release without GRAS on by default and people can try it out themselves. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user govind-menon commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149511408 --- Diff: storm-client/src/storm.thrift --- @@ -496,6 +496,8 @@ struct WorkerResources { 3: optional double cpu; 4: optional double shared_mem_on_heap; //This is just for accounting mem_on_heap should be used for enforcement 5: optional double shared_mem_off_heap; //This is just for accounting mem_off_heap should be used for enforcement +6: optional mapresources; // Generic resources Map +7: optional map shared_resources; // Shared Generic resources Map --- End diff -- We're definitely using it. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user govind-menon commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149225052 --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java --- @@ -430,37 +427,39 @@ public static void main(String[] args) throws Exception { return topologyResources; } -static void checkIntialization(MaptopologyResources, String com, - Map topologyConf) { -checkInitMem(topologyResources, com, topologyConf); -checkInitCpu(topologyResources, com, topologyConf); -} +/** + * Checks if the topology's resource requirements are initialized. + * @param topologyResources map of resouces requirements --- End diff -- @revans2 I agree I was just following of how checkInitialization was behaving before. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149143172 --- Diff: storm-client/src/jvm/org/apache/storm/Config.java --- @@ -241,6 +241,12 @@ public static final String TOPOLOGY_TASKS = "topology.tasks"; /** + * A map of resources used by each component e.g {"cpu" : 200.0. "onheap.memory.mb": 256.0, "gpu" : 0.5 } --- End diff -- Do we actually want users setting `cpu` as the name of one of the resources? Because in the code we are looking for `cpu.pcore.percent` or `topology.component.cpu.pcore.percent` and I really think this is going to confuse our users. Especially if we are also telling them to use this for the `SUPERVISOR_RESOURCES_MAP`. Can we please make sure that the comments here match with what we would expect the user to actually set, and ideally have them match any naming convention that we would want them to use? ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149161951 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -556,6 +605,9 @@ public boolean wouldFit( } double memoryAdded = afterTotal - currentTotal; +double memoryAvailable = ObjectReader.getDouble( + resourcesAvailable.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, null), 0.0); --- End diff -- Same as above, just make it a get. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149164076 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java --- @@ -129,41 +137,65 @@ public static String getJsonWithUpdatedResources(String jsonConf, MaptopologyResources, String com, - Map topologyConf) { -checkInitMem(topologyResources, com, topologyConf); -checkInitCpu(topologyResources, com, topologyConf); -} +public static void checkInitialization(Map topologyResources, String componentId, Map topologyConf) { +StringBuilder msgBuilder = new StringBuilder(); -private static void checkInitMem(Map topologyResources, String com, -Map topologyConf) { -if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) { -Double onHeap = ObjectReader.getDouble( - topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null); -if (onHeap != null) { - topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap); -debugMessage("ONHEAP", com, topologyConf); -} +Set resourceNameSet = new HashSet<>(); + +resourceNameSet.add( +Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT +); +resourceNameSet.add( +Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB +); +resourceNameSet.add( +Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB +); + +Map topologyComponentResourcesMap = +(Map ) topologyConf.getOrDefault( +Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, Collections.emptyMap()); + +resourceNameSet.addAll(topologyResources.keySet()); +resourceNameSet.addAll(topologyComponentResourcesMap.keySet()); + +for (String resourceName : resourceNameSet) { +msgBuilder.append(checkInitResource(topologyResources, topologyConf, topologyComponentResourcesMap, resourceName)); } -if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) { -Double offHeap = ObjectReader.getDouble( - topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null); -if (offHeap != null) { - topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap); -debugMessage("OFFHEAP", com, topologyConf); -} + +Map normalizedTopologyResources = normalizedResourceMap(topologyResources); +topologyResources.clear(); +topologyResources.putAll(normalizedTopologyResources); + +if (msgBuilder.length() > 0) { +String resourceDefaults = msgBuilder.toString(); +LOG.debug( +"Unable to extract resource requirement for Component {} \n Resources : {}", +componentId, resourceDefaults); } } -private static void checkInitCpu(Map topologyResources, String com, - Map topologyConf) { -if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) { -Double cpu = ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null); -if (cpu != null) { - topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu); -debugMessage("CPU", com, topologyConf); +private static String checkInitResource(Map topologyResources, Map topologyConf, +Map topologyComponentResourcesMap, String resourceName) { +StringBuilder msgBuilder = new StringBuilder(); +String normalizedResourceName = resourceNameMapping.getOrDefault(resourceName, resourceName); +if (!topologyResources.containsKey(normalizedResourceName)) { +if (topologyConf.containsKey(resourceName)) { +Double resourceValue = ObjectReader.getDouble(topologyConf.get(resourceName)); +if(resourceValue!=null) { +topologyResources.put(normalizedResourceName, resourceValue);
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149162901 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java --- @@ -289,17 +300,13 @@ public Double getTotalMemReqTask(ExecutorDetails exec) { /** * Gets the total memory resource list for a set of tasks that is part of a topology. - * @return Mapa map of the total memory requirement for all tasks in topology topoId. + * @param executors + * @return Map , --- End diff -- The type is not needed here, and I think it all works on a single line. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149162982 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java --- @@ -289,17 +300,13 @@ public Double getTotalMemReqTask(ExecutorDetails exec) { /** * Gets the total memory resource list for a set of tasks that is part of a topology. - * @return Mapa map of the total memory requirement for all tasks in topology topoId. + * @param executors --- End diff -- nit Having a param with no explanation is not really helpful. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149161531 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -503,43 +553,42 @@ public boolean wouldFit( WorkerSlot ws, ExecutorDetails exec, TopologyDetails td, -double maxHeap, -double memoryAvailable, -double cpuAvailable) { -//NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot. -//CPU is simplest because it does not have odd interactions. -double cpuNeeded = td.getTotalCpuReqTask(exec); -if (cpuNeeded > cpuAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}", -td.getName(), -exec, -ws, -cpuNeeded, -cpuAvailable); -} -//Not enough CPU no need to try any more -return false; -} +MapresourcesAvailable, +double maxHeap) { -//Lets see if we can make the Memory one fast too, at least in the failure case. -//The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know -// Even with shared it will not work -double minMemNeeded = td.getTotalMemReqTask(exec); -if (minMemNeeded > memoryAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable); +Map requestedResources = td.getTotalResources(exec); + +for (Entry resourceNeededEntry : requestedResources.entrySet()) { +String resourceName = resourceNeededEntry.getKey().toString(); --- End diff -- Why are we calling toString on a String? ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149162640 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java --- @@ -133,30 +136,31 @@ public StormTopology getTopology() { private void initResourceList() { this.resourceList = new HashMap<>(); -// Extract bolt memory info +// Extract bolt resource info if (topology.get_bolts() != null) { for (Map.Entrybolt : topology.get_bolts().entrySet()) { //the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad) Map topologyResources = ResourceUtils.parseResources(bolt.getValue().get_common().get_json_conf()); -ResourceUtils.checkIntialization(topologyResources, bolt.getKey(), topologyConf); +ResourceUtils.checkInitialization(topologyResources, bolt.getKey(), this.topologyConf); for (Map.Entry anExecutorToComponent : executorToComponent.entrySet()) { -if (bolt.getKey().equals(anExecutorToComponent.getValue())) { +if (bolt.getKey().equals(anExecutorToComponent.getValue()) && topologyResources.keySet().size() > 0) { --- End diff -- nit: `!topologyResources.isEmpty()` is a bit cleaner. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149164720 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java --- @@ -238,4 +277,57 @@ public static double sum(Collection list) { public static double avg(Collection list) { return sum(list) / list.size(); } + +/** + * Normalizes a supervisor resource map or topology details map's keys to universal resource names. + * @param resourceMap resource map of either Supervisor or Topology + * @return the resource map with common resource names + */ +public static MapnormalizedResourceMap(Map resourceMap) { +Map result = new HashMap(); + +result.putAll(resourceMap); +for(Map.Entry entry: resourceMap.entrySet()) { +if (resourceNameMapping.containsKey(entry.getKey())) { +result.put(resourceNameMapping.get(entry.getKey()), ObjectReader.getDouble(entry.getValue(), 0.0)); +result.remove(entry.getKey()); +} +} +return result; +} + +public static Map addResources(Map resourceMap1, Map resourceMap2) { +Map result = new HashMap(); + +result.putAll(resourceMap1); + +for(Map.Entry entry: resourceMap2.entrySet()) { --- End diff -- nit space after the for. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149164027 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java --- @@ -129,41 +137,65 @@ public static String getJsonWithUpdatedResources(String jsonConf, MaptopologyResources, String com, - Map topologyConf) { -checkInitMem(topologyResources, com, topologyConf); -checkInitCpu(topologyResources, com, topologyConf); -} +public static void checkInitialization(Map topologyResources, String componentId, Map topologyConf) { +StringBuilder msgBuilder = new StringBuilder(); -private static void checkInitMem(Map topologyResources, String com, -Map topologyConf) { -if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) { -Double onHeap = ObjectReader.getDouble( - topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null); -if (onHeap != null) { - topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap); -debugMessage("ONHEAP", com, topologyConf); -} +Set resourceNameSet = new HashSet<>(); + +resourceNameSet.add( +Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT +); +resourceNameSet.add( +Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB +); +resourceNameSet.add( +Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB +); + +Map topologyComponentResourcesMap = +(Map ) topologyConf.getOrDefault( +Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, Collections.emptyMap()); + +resourceNameSet.addAll(topologyResources.keySet()); +resourceNameSet.addAll(topologyComponentResourcesMap.keySet()); + +for (String resourceName : resourceNameSet) { +msgBuilder.append(checkInitResource(topologyResources, topologyConf, topologyComponentResourcesMap, resourceName)); } -if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) { -Double offHeap = ObjectReader.getDouble( - topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null); -if (offHeap != null) { - topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap); -debugMessage("OFFHEAP", com, topologyConf); -} + +Map normalizedTopologyResources = normalizedResourceMap(topologyResources); +topologyResources.clear(); +topologyResources.putAll(normalizedTopologyResources); + +if (msgBuilder.length() > 0) { +String resourceDefaults = msgBuilder.toString(); +LOG.debug( +"Unable to extract resource requirement for Component {} \n Resources : {}", +componentId, resourceDefaults); } } -private static void checkInitCpu(Map topologyResources, String com, - Map topologyConf) { -if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) { -Double cpu = ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null); -if (cpu != null) { - topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu); -debugMessage("CPU", com, topologyConf); +private static String checkInitResource(Map topologyResources, Map topologyConf, +Map topologyComponentResourcesMap, String resourceName) { +StringBuilder msgBuilder = new StringBuilder(); +String normalizedResourceName = resourceNameMapping.getOrDefault(resourceName, resourceName); +if (!topologyResources.containsKey(normalizedResourceName)) { +if (topologyConf.containsKey(resourceName)) { +Double resourceValue = ObjectReader.getDouble(topologyConf.get(resourceName)); +if(resourceValue!=null) { --- End diff -- nit: space after the if. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149162183 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -993,6 +1045,31 @@ public WorkerResources getWorkerResources(WorkerSlot ws) { } @Override +public MapgetAllScheduledResourcesForNode(String nodeId) { +Map totalScheduledResources = new HashMap<>(); +for (SchedulerAssignmentImpl assignment : assignments.values()) { +for (Entry entry : +assignment.getScheduledResources().entrySet()) { +if (nodeId.equals(entry.getKey().getNodeId())) { +WorkerResources resources = entry.getValue(); +for (Map.Entry resourceEntry : resources.get_resources().entrySet()) { +Double currentResourceValue = totalScheduledResources.containsKey(resourceEntry.getKey()) ? totalScheduledResources.get(resourceEntry.getKey()) : 0.0; + totalScheduledResources.put(resourceEntry.getKey().toString(), currentResourceValue + ObjectReader.getDouble(resourceEntry.getValue())); --- End diff -- Why calling toString on a String? ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149162313 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -993,6 +1045,31 @@ public WorkerResources getWorkerResources(WorkerSlot ws) { } @Override +public MapgetAllScheduledResourcesForNode(String nodeId) { +Map totalScheduledResources = new HashMap<>(); +for (SchedulerAssignmentImpl assignment : assignments.values()) { +for (Entry entry : +assignment.getScheduledResources().entrySet()) { +if (nodeId.equals(entry.getKey().getNodeId())) { +WorkerResources resources = entry.getValue(); +for (Map.Entry resourceEntry : resources.get_resources().entrySet()) { +Double currentResourceValue = totalScheduledResources.containsKey(resourceEntry.getKey()) ? totalScheduledResources.get(resourceEntry.getKey()) : 0.0; --- End diff -- This is actually the perfect place to call getOrDefault. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149163670 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java --- @@ -57,7 +64,8 @@ if (topology.get_spouts() != null) { for (Map.Entryspout : topology.get_spouts().entrySet()) { Map topologyResources = parseResources(spout.getValue().get_common().get_json_conf()); -checkIntialization(topologyResources, spout.getValue().toString(), topologyConf); +checkInitialization(topologyResources, spout.getKey(), topologyConf); +LOG.warn("Turned {} into {}", spout.getValue().get_common().get_json_conf(), topologyResources); --- End diff -- Can we delete this and the one on line 54 too? it looks like a debug statement that accidentally slipped into the code. Truth be told I think it is my debug statement that slipped into this... ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149164648 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java --- @@ -238,4 +277,57 @@ public static double sum(Collection list) { public static double avg(Collection list) { return sum(list) / list.size(); } + +/** + * Normalizes a supervisor resource map or topology details map's keys to universal resource names. + * @param resourceMap resource map of either Supervisor or Topology + * @return the resource map with common resource names + */ +public static MapnormalizedResourceMap(Map resourceMap) { +Map result = new HashMap(); + +result.putAll(resourceMap); +for(Map.Entry entry: resourceMap.entrySet()) { --- End diff -- nit: space after the for. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149163756 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java --- @@ -129,41 +137,65 @@ public static String getJsonWithUpdatedResources(String jsonConf, MaptopologyResources, String com, - Map topologyConf) { -checkInitMem(topologyResources, com, topologyConf); -checkInitCpu(topologyResources, com, topologyConf); -} +public static void checkInitialization(Map topologyResources, String componentId, Map topologyConf) { --- End diff -- Why are we losing the type hints on topologyConf? ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149140827 --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java --- @@ -430,37 +427,39 @@ public static void main(String[] args) throws Exception { return topologyResources; } -static void checkIntialization(MaptopologyResources, String com, - Map topologyConf) { -checkInitMem(topologyResources, com, topologyConf); -checkInitCpu(topologyResources, com, topologyConf); -} +/** + * Checks if the topology's resource requirements are initialized. + * @param topologyResources map of resouces requirements --- End diff -- nit: could we add a comment that topologyResources will be modified if not properly initialized? Just so it is clear, because that is not a typical thing to do in a java method. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149160853 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java --- @@ -73,11 +75,23 @@ private SupervisorInfo buildSupervisorInfo(Mapconf, Supervisor private Map mkSupervisorCapacities(Map conf) { Map ret = new HashMap (); +// Put in legacy values Double mem = ObjectReader.getDouble(conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB), 4096.0); ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem); Double cpu = ObjectReader.getDouble(conf.get(Config.SUPERVISOR_CPU_CAPACITY), 400.0); ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu); -return ret; + + +// If configs are present in Generic map and legacy - the legacy values will be overwritten +Map resourcesMap = (Map ) conf.get(Config.SUPERVISOR_RESOURCES_MAP); --- End diff -- This is not guaranteed to be a Double. It is guaranteed to be a Number. So we need to convert it with something like. ``` if (resourcesMap != null) { for (Map.Entry stringNumberEntry : resourcesMap.entrySet()) { ret.put(stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue()); } } ``` ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149159775 --- Diff: storm-client/src/jvm/org/apache/storm/transactional/TransactionalTopologyBuilder.java --- @@ -534,9 +559,33 @@ public BoltDeclarer addConfigurations(Mapconf) { } @Override +public Map getRASConfiguration() { +for(Map conf : _component.componentConfs) { +if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) { +return conf; +} +} +return new HashMap<>(); +} + +@Override public BoltDeclarer addSharedMemory(SharedMemory request) { _component.sharedMemory.add(request); return this; } + +@SuppressWarnings("unchecked") +@Override +public BoltDeclarer addResource(String resourceName, Number resourceValue) { +Map resourcesMap = (Map ) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP); + +if (resourcesMap == null) { +resourcesMap = new HashMap<>(); +} +resourcesMap.put(resourceName, resourceValue.doubleValue()); + + getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap); --- End diff -- And here. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149159593 --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java --- @@ -576,10 +577,30 @@ public T addConfigurations(Mapconf) { throw new IllegalArgumentException("Cannot set serializations for a component using fluent API"); } String currConf = _commons.get(_id).get_json_conf(); - _commons.get(_id).set_json_conf(mergeIntoJson(Utils.parseJson(currConf), conf)); + _commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf)); return (T) this; } - + +@SuppressWarnings("unchecked") +@Override +public T addResource(String resourceName, Number resourceValue) { +Map resourcesMap = (Map ) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP); + +if (resourcesMap == null) { +resourcesMap = new HashMap<>(); +} +resourcesMap.put(resourceName, resourceValue.doubleValue()); + + getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap); --- End diff -- Like with the others this is not going to work. the map you are changing is never put back into the serialized json config. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149159736 --- Diff: storm-client/src/jvm/org/apache/storm/transactional/TransactionalTopologyBuilder.java --- @@ -228,10 +229,34 @@ public SpoutDeclarer addConfigurations(Mapconf) { } @Override +public Map getRASConfiguration() { +for(Map conf : _spoutConfs) { +if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) { +return conf; +} +} +return new HashMap<>(); +} + +@Override public SpoutDeclarer addSharedMemory(SharedMemory request) { _spoutSharedMemory.add(request); return this; -} +} + +@SuppressWarnings("unchecked") +@Override +public SpoutDeclarer addResource(String resourceName, Number resourceValue) { +Map resourcesMap = (Map ) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP); + +if (resourcesMap == null) { +resourcesMap = new HashMap<>(); +} +resourcesMap.put(resourceName, resourceValue.doubleValue()); + + getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap); --- End diff -- Here too I don't think this will work for the same reasons as elsewhere. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149144724 --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java --- @@ -450,9 +456,33 @@ public BoltDeclarer addConfigurations(Mapconf) { } @Override +public Map getRASConfiguration() { +for (Map conf : _component.componentConfs) { +if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) { +return conf; +} +} +return new HashMap<>(); +} + +@Override public BoltDeclarer addSharedMemory(SharedMemory request) { _component.sharedMemory.add(request); return this; } + +@SuppressWarnings("unchecked") +@Override +public BoltDeclarer addResource(String resourceName, Number resourceValue) { +Map resourcesMap = (Map ) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP); + +if (resourcesMap == null) { --- End diff -- The above code never returns a null. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149163179 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java --- @@ -479,6 +499,18 @@ private void addDefaultResforExec(ExecutorDetails exec) { adjustResourcesForExec(exec, defaultResourceList); +MaptopologyComponentResourcesMap = ( +Map ) this.topologyConf.getOrDefault( +Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>() +); + +assert topologyComponentResourcesMap != null; + +//topologyComponentResourcesMap = normalizedResourceMap(topologyComponentResourcesMap); --- End diff -- Not needed please delete. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149148257 --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java --- @@ -450,9 +456,33 @@ public BoltDeclarer addConfigurations(Mapconf) { } @Override +public Map getRASConfiguration() { +for (Map conf : _component.componentConfs) { --- End diff -- OK so this is crazy over complicated for something that does not need all of this complexity. I don't know why `_component.componentConfs` is a list of maps instead of just a map because they will all be merged together in the final component. If you could refactor it to be just a single map then life is a lot simpler, if not then we should not be just returning a new empty hash map, we should be adding it to the `_component.componentConfs` or it is going to be lost. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149148502 --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java --- @@ -450,9 +456,33 @@ public BoltDeclarer addConfigurations(Mapconf) { } @Override +public Map getRASConfiguration() { +for (Map conf : _component.componentConfs) { +if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) { +return conf; +} +} +return new HashMap<>(); +} + +@Override public BoltDeclarer addSharedMemory(SharedMemory request) { _component.sharedMemory.add(request); return this; } + +@SuppressWarnings("unchecked") +@Override +public BoltDeclarer addResource(String resourceName, Number resourceValue) { +Map resourcesMap = (Map ) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP); + +if (resourcesMap == null) { +resourcesMap = new HashMap<>(); +} +resourcesMap.put(resourceName, resourceValue.doubleValue()); + + getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap); --- End diff -- This does not work, see my comment above inside `getRASConfiguration` ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149160149 --- Diff: storm-client/src/storm.thrift --- @@ -496,6 +496,8 @@ struct WorkerResources { 3: optional double cpu; 4: optional double shared_mem_on_heap; //This is just for accounting mem_on_heap should be used for enforcement 5: optional double shared_mem_off_heap; //This is just for accounting mem_off_heap should be used for enforcement +6: optional mapresources; // Generic resources Map +7: optional map shared_resources; // Shared Generic resources Map --- End diff -- Are we using this? If not lets hold off on adding it in until we support it. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149161667 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -503,43 +553,42 @@ public boolean wouldFit( WorkerSlot ws, ExecutorDetails exec, TopologyDetails td, -double maxHeap, -double memoryAvailable, -double cpuAvailable) { -//NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot. -//CPU is simplest because it does not have odd interactions. -double cpuNeeded = td.getTotalCpuReqTask(exec); -if (cpuNeeded > cpuAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}", -td.getName(), -exec, -ws, -cpuNeeded, -cpuAvailable); -} -//Not enough CPU no need to try any more -return false; -} +MapresourcesAvailable, +double maxHeap) { -//Lets see if we can make the Memory one fast too, at least in the failure case. -//The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know -// Even with shared it will not work -double minMemNeeded = td.getTotalMemReqTask(exec); -if (minMemNeeded > memoryAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable); +Map requestedResources = td.getTotalResources(exec); --- End diff -- Again I don't think this is guaranteed to be a Double. It needs to be a Number that we can then convert. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149161456 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -503,43 +553,42 @@ public boolean wouldFit( WorkerSlot ws, ExecutorDetails exec, TopologyDetails td, -double maxHeap, -double memoryAvailable, -double cpuAvailable) { -//NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot. -//CPU is simplest because it does not have odd interactions. -double cpuNeeded = td.getTotalCpuReqTask(exec); -if (cpuNeeded > cpuAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}", -td.getName(), -exec, -ws, -cpuNeeded, -cpuAvailable); -} -//Not enough CPU no need to try any more -return false; -} +MapresourcesAvailable, +double maxHeap) { -//Lets see if we can make the Memory one fast too, at least in the failure case. -//The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know -// Even with shared it will not work -double minMemNeeded = td.getTotalMemReqTask(exec); -if (minMemNeeded > memoryAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable); +Map requestedResources = td.getTotalResources(exec); + +for (Entry resourceNeededEntry : requestedResources.entrySet()) { +String resourceName = resourceNeededEntry.getKey().toString(); +if (resourceName == Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) { --- End diff -- These need to be a .equals instead of ==. A lot of the time this might work, but there are cases with strings where it will not work. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149159863 --- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java --- @@ -763,9 +787,33 @@ public BoltDeclarer addConfigurations(Mapconf) { } @Override +public Map getRASConfiguration() { +for(Map conf : _component.componentConfs) { +if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) { +return conf; +} +} +return new HashMap<>(); +} + +@Override public BoltDeclarer addSharedMemory(SharedMemory request) { _component.sharedMemory.add(request); return this; } + +@SuppressWarnings("unchecked") +@Override +public BoltDeclarer addResource(String resourceName, Number resourceValue) { +Map resourcesMap = (Map ) getRASConfiguration().get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP); + +if (resourcesMap == null) { +resourcesMap = new HashMap<>(); +} +resourcesMap.put(resourceName, resourceValue.doubleValue()); + + getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap); --- End diff -- And here... ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r149161816 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -503,43 +553,42 @@ public boolean wouldFit( WorkerSlot ws, ExecutorDetails exec, TopologyDetails td, -double maxHeap, -double memoryAvailable, -double cpuAvailable) { -//NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot. -//CPU is simplest because it does not have odd interactions. -double cpuNeeded = td.getTotalCpuReqTask(exec); -if (cpuNeeded > cpuAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}", -td.getName(), -exec, -ws, -cpuNeeded, -cpuAvailable); -} -//Not enough CPU no need to try any more -return false; -} +MapresourcesAvailable, +double maxHeap) { -//Lets see if we can make the Memory one fast too, at least in the failure case. -//The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know -// Even with shared it will not work -double minMemNeeded = td.getTotalMemReqTask(exec); -if (minMemNeeded > memoryAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable); +Map requestedResources = td.getTotalResources(exec); + +for (Entry resourceNeededEntry : requestedResources.entrySet()) { +String resourceName = resourceNeededEntry.getKey().toString(); +if (resourceName == Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) { +continue; +} +Double resourceNeeded = ObjectReader.getDouble(resourceNeededEntry.getValue()); +Double resourceAvailable = ObjectReader.getDouble( +resourcesAvailable.getOrDefault(resourceName, null), 0.0); --- End diff -- getOrDefault with the second argument a null is the same as calling get. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user govind-menon commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r148045890 --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java --- @@ -17,12 +17,10 @@ */ package org.apache.storm.scheduler; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; -import java.util.Map; +import java.util.*; --- End diff -- @Ethanlm Thanks! ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147755614 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource.strategies.scheduling; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.Component; +import org.apache.storm.scheduler.ExecutorDetails; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.resource.ResourceUtils; +import org.apache.storm.scheduler.resource.SchedulingResult; +import org.apache.storm.scheduler.resource.SchedulingStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy { +private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareStrategy.class); + +@Override +public SchedulingResult schedule(Cluster cluster, TopologyDetails td) { +prepare(cluster); +if (nodes.getNodes().size() <= 0) { +LOG.warn("No available nodes to schedule tasks on!"); +return SchedulingResult.failure( +SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!"); +} +Collection unassignedExecutors = +new HashSet<>(this.cluster.getUnassignedExecutors(td)); +LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors); +Collection scheduledTasks = new ArrayList<>(); +List spouts = this.getSpouts(td); + +if (spouts.size() == 0) { +LOG.error("Cannot find a Spout!"); +return SchedulingResult.failure( +SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!"); +} + +//order executors to be scheduled +List orderedExecutors = this.orderExecutors(td, unassignedExecutors); +LOG.info("orderedExecutors"); +LOG.info(orderedExecutors. toString()); +Collection executorsNotScheduled = new HashSet<>(unassignedExecutors); +List favoredNodes = (List) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES); +List unFavoredNodes = (List) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES); + +for (ExecutorDetails exec : orderedExecutors) { +LOG.debug( +"Attempting to schedule: {} of component {}[ REQ {} ]", +exec, +td.getExecutorToComponent().get(exec), +td.getTaskResourceReqList(exec)); +final List sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes); +LOG.info("sortedNodes"); +LOG.info(sortedNodes.toString()); + +scheduleExecutor(exec, td, scheduledTasks, sortedNodes); +} + +executorsNotScheduled.removeAll(scheduledTasks); +LOG.error("/* Scheduling left over task (most likely sys tasks) */"); +// schedule left over system tasks +for (ExecutorDetails exec : executorsNotScheduled) { +final List sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes); +LOG.info("sortedNodes"); +LOG.info(sortedNodes.toString()); --- End diff -- same here ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147737087 --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java --- @@ -17,12 +17,10 @@ */ package org.apache.storm.scheduler; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; -import java.util.Map; +import java.util.*; --- End diff -- I believe it's automatically done by intellj. You can change the preferences: Preference--> Editor-->Code Syte --> Java --> and then change "class count to use import with '*'" to a very large number, e.g. 999. Then you should never have this problem. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147755026 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource.strategies.scheduling; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.Component; +import org.apache.storm.scheduler.ExecutorDetails; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.resource.ResourceUtils; +import org.apache.storm.scheduler.resource.SchedulingResult; +import org.apache.storm.scheduler.resource.SchedulingStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy { +private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareStrategy.class); + +@Override +public SchedulingResult schedule(Cluster cluster, TopologyDetails td) { +prepare(cluster); +if (nodes.getNodes().size() <= 0) { +LOG.warn("No available nodes to schedule tasks on!"); +return SchedulingResult.failure( +SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!"); +} +Collection unassignedExecutors = +new HashSet<>(this.cluster.getUnassignedExecutors(td)); +LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors); +Collection scheduledTasks = new ArrayList<>(); +List spouts = this.getSpouts(td); + +if (spouts.size() == 0) { +LOG.error("Cannot find a Spout!"); +return SchedulingResult.failure( +SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!"); +} + +//order executors to be scheduled +List orderedExecutors = this.orderExecutors(td, unassignedExecutors); +LOG.info("orderedExecutors"); +LOG.info(orderedExecutors. toString()); --- End diff -- Is it better to put them in one LOG.info()? (I am not sure if LOG.debug() is better here) ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147755367 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java --- @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource.strategies.scheduling; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.Component; +import org.apache.storm.scheduler.ExecutorDetails; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.resource.ResourceUtils; +import org.apache.storm.scheduler.resource.SchedulingResult; +import org.apache.storm.scheduler.resource.SchedulingStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy { +private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareStrategy.class); + +@Override +public SchedulingResult schedule(Cluster cluster, TopologyDetails td) { +prepare(cluster); +if (nodes.getNodes().size() <= 0) { +LOG.warn("No available nodes to schedule tasks on!"); +return SchedulingResult.failure( +SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!"); +} +Collection unassignedExecutors = +new HashSet<>(this.cluster.getUnassignedExecutors(td)); +LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors); +Collection scheduledTasks = new ArrayList<>(); +List spouts = this.getSpouts(td); + +if (spouts.size() == 0) { +LOG.error("Cannot find a Spout!"); +return SchedulingResult.failure( +SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!"); +} + +//order executors to be scheduled +List orderedExecutors = this.orderExecutors(td, unassignedExecutors); +LOG.info("orderedExecutors"); +LOG.info(orderedExecutors. toString()); +Collection executorsNotScheduled = new HashSet<>(unassignedExecutors); +List favoredNodes = (List) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES); +List unFavoredNodes = (List) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES); + +for (ExecutorDetails exec : orderedExecutors) { +LOG.debug( +"Attempting to schedule: {} of component {}[ REQ {} ]", +exec, +td.getExecutorToComponent().get(exec), +td.getTaskResourceReqList(exec)); +final List sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes); +LOG.info("sortedNodes"); +LOG.info(sortedNodes.toString()); + --- End diff -- same here ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
GitHub user govind-menon reopened a pull request: https://github.com/apache/storm/pull/2385 STORM-2727: Generic Resource Aware Scheduling Remaining 1. Add Tests 2. Do more manual testing You can merge this pull request into a Git repository by running: $ git pull https://github.com/govind-menon/storm YSTORM-2727 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2385.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2385 commit facf515b121c80eed6c02d74104e323cfb9e4a1d Author: Govind MenonDate: 2017-09-07T18:50:05Z YSTORM-2725: Generic Resource Scheduling - initial config changes and TopologyBuilder API commit 4337d44b4f95da34f2b1d0a4160353ab81c0fab0 Author: Govind Menon Date: 2017-10-23T15:31:08Z YSTORM-2727: Generic Resource Aware Scheduling(GRAS) - metadata, scheduling strategies and configuration enhancements commit 8e867fcf9183e8476658bff7119d9de8545e9aed Author: Govind Menon Date: 2017-10-23T15:31:08Z YSTORM-2727: Generic Resource Aware Scheduling(GRAS) - metadata, scheduling strategies and configuration enhancements ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user govind-menon closed the pull request at: https://github.com/apache/storm/pull/2385 ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147236780 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -664,6 +722,9 @@ private double calculateSharedOffHeapMemory(String nodeId, SchedulerAssignmentIm private double calculateSharedOffHeapMemory( String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails extra) { +if (assignment == null) { --- End diff -- When is the assignment null??? Why would we try to calculate the shared off heap memory for no assignment??? ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147237164 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java --- @@ -168,6 +172,15 @@ private void initResourceList() { // topology.getbolt (AKA sys tasks most specifically __acker tasks) for (ExecutorDetails exec : getExecutors()) { if (!resourceList.containsKey(exec)) { +// TODO: Update this debug statement --- End diff -- Either update it or remove the TODO ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147234270 --- Diff: storm-client/src/jvm/org/apache/storm/Config.java --- @@ -240,6 +240,12 @@ public static final String TOPOLOGY_TASKS = "topology.tasks"; /** + * A map of resources used by each component e.g {"cpu" : 200.0. "onheap.memory.mb": 256.0, "gpu" : 0.5 } + */ +@isMapEntryType(keyType = String.class, valueType = Double.class) --- End diff -- It is really hard to enforce a double in the conf. This is because both json and yaml don't know about this and put the parsed number in the smallest type that matches it. i.e 1 becomes an integer, not a double. 1.0 might be a double, but it might also be a float, I dont' know. If the number is really big it could be a long. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147234853 --- Diff: storm-client/src/jvm/org/apache/storm/Constants.java --- @@ -56,5 +58,20 @@ public static final String STORM_ACTIVE_ATOM = "storm-active-atom"; public static final String COMPONENT_TO_DEBUG_ATOM = "storm-component->debug-atom"; public static final Object LOAD_MAPPING = "load-mapping"; + +public static final String COMMON_CPU_RESOURCE_NAME = "cpu.pcore.percent"; +public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb"; +public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb"; +public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb"; --- End diff -- Can we document these in some place? For example the configs show some examples, but they are not exhaustive, and the rebalance command is really bad at this so a user has no idea what to put in as the key. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147235417 --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java --- @@ -17,12 +17,10 @@ */ package org.apache.storm.scheduler; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; -import java.util.Map; +import java.util.*; --- End diff -- I think .* is against the coding standard. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147233694 --- Diff: examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java --- @@ -430,37 +427,37 @@ public static void main(String[] args) throws Exception { return topologyResources; } -static void checkIntialization(MaptopologyResources, String com, - Map topologyConf) { -checkInitMem(topologyResources, com, topologyConf); -checkInitCpu(topologyResources, com, topologyConf); -} +/** + * Checks if the topology's resource requirements are initialized. + * @param topologyResources map of resouces requirements + * @param componentId component for which initialization is being conducted + * @param topologyConf topology configuration + * @throws Exception on any error + */ +public static void checkInitialization(Map topologyResources, String componentId, Map topologyConf) { +StringBuilder msgBuilder = new StringBuilder(); -static void checkInitMem(Map topologyResources, String com, - Map topologyConf) { -if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) { -Double onHeap = ObjectReader.getDouble( - topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null); -if (onHeap != null) { - topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap); -} +for (String resourceName : topologyResources.keySet()) { +msgBuilder.append(checkInitResource(topologyResources, topologyConf, resourceName)); } -if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) { -Double offHeap = ObjectReader.getDouble( - topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null); -if (offHeap != null) { - topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap); -} + +if (msgBuilder.length() > 0) { +String resourceDefaults = msgBuilder.toString(); +LOG.debug( +"Unable to extract resource requirement for Component {} \n Resources : {}", +componentId, resourceDefaults); } } -static void checkInitCpu(Map topologyResources, String com, - Map topologyConf) { -if (!topologyResources.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) { -Double cpu = ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null); -if (cpu != null) { - topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, cpu); -} +private static String checkInitResource(Map topologyResources, Map topologyConf, String resourceName) { +StringBuilder msgBuilder = new StringBuilder(); +if (topologyResources.containsKey(resourceName)) { +Double resourceValue = (Double) topologyConf.getOrDefault(resourceName, null); --- End diff -- I think you missed something in the translation. I think it should be more like ``` if (topologyResources.containsKey(resourceName)) { Double resourceValue = ObjectReader.getDouble(topologyConf.get(resourceName), null); if (resourceValue != null) { topologyResources.put(resourceName, resourceValue); } ... ``` ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147235680 --- Diff: storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java --- @@ -79,8 +81,18 @@ public T setMemoryLoad(Number onHeap, Number offHeap) { @Override public T setCPULoad(Number amount) { if(amount != null) { +addResource(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount); return addConfiguration(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, amount); } return (T) this; } + +@SuppressWarnings("unchecked") +@Override +public T addResources(Mapresources) { +if(resources != null) { --- End diff -- nit: you need a space after the if. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147237442 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java --- @@ -189,13 +201,20 @@ private static void checkInitCpu(MaptopologyResources, String c null); topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu); } -LOG.debug("Topology Resources {}", topologyResources); + +// If resource is also present in resources map will overwrite the above +if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) { +topologyResources.putAll((Map ) jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)); +} +LOG.info("Topology Resources {}", topologyResources); } } catch (ParseException e) { LOG.error("Failed to parse component resources is:" + e.toString(), e); return null; } -return topologyResources; +LOG.info("Topology Resources {}", normalizedResourceMap(topologyResources)); +LOG.info("Topology Resources {}", normalizedResourceMap(topologyResources)); --- End diff -- Why do we need to log this twice? or at all? ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147236218 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -503,62 +553,70 @@ public boolean wouldFit( WorkerSlot ws, ExecutorDetails exec, TopologyDetails td, -double maxHeap, -double memoryAvailable, -double cpuAvailable) { -//NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot. -//CPU is simplest because it does not have odd interactions. -double cpuNeeded = td.getTotalCpuReqTask(exec); -if (cpuNeeded > cpuAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}", -td.getName(), -exec, -ws, -cpuNeeded, -cpuAvailable); +MapresourcesAvailable, +double maxHeap) { + +Map requestedResources = td.getTotalResources(exec); + +LOG.info(td.getName()); +LOG.info("requested"); +LOG.info(requestedResources.toString()); +LOG.info("available"); +LOG.info(resourcesAvailable.toString()); +LOG.info(ws.toString()); --- End diff -- I think this is debug logging. Can we please remove it? ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147236953 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -993,6 +1054,31 @@ public WorkerResources getWorkerResources(WorkerSlot ws) { } @Override +public MapgetAllScheduledResourcesForNode(String nodeId) { +Map totalScheduledResources = new HashMap<>(); +for (SchedulerAssignmentImpl assignment : assignments.values()) { +for (Entry entry : +assignment.getScheduledResources().entrySet()) { +if (nodeId.equals(entry.getKey().getNodeId())) { +WorkerResources resources = entry.getValue(); +for (Map.Entry resourceEntry : resources.get_resources().entrySet()) { --- End diff -- Could you please add in the types for the Map.Entry? ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147237298 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java --- @@ -18,22 +18,24 @@ package org.apache.storm.scheduler.resource; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.*; --- End diff -- again lets avoid .* imports ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147235287 --- Diff: storm-client/src/jvm/org/apache/storm/generated/WorkerResources.java --- @@ -1,21 +1,4 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** * Autogenerated by Thrift Compiler (0.9.3) --- End diff -- What happened to the license header? Please run the shell script to generate the thrift code instead of doing it on your own. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147236417 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -503,62 +553,70 @@ public boolean wouldFit( WorkerSlot ws, ExecutorDetails exec, TopologyDetails td, -double maxHeap, -double memoryAvailable, -double cpuAvailable) { -//NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot. -//CPU is simplest because it does not have odd interactions. -double cpuNeeded = td.getTotalCpuReqTask(exec); -if (cpuNeeded > cpuAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}", -td.getName(), -exec, -ws, -cpuNeeded, -cpuAvailable); +MapresourcesAvailable, +double maxHeap) { + +Map requestedResources = td.getTotalResources(exec); + +LOG.info(td.getName()); +LOG.info("requested"); +LOG.info(requestedResources.toString()); +LOG.info("available"); +LOG.info(resourcesAvailable.toString()); +LOG.info(ws.toString()); +for (Entry resourceNeededEntry : requestedResources.entrySet()) { +String resourceName = resourceNeededEntry.getKey().toString(); +if (resourceName == Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) { +continue; } -//Not enough CPU no need to try any more -return false; -} - -//Lets see if we can make the Memory one fast too, at least in the failure case. -//The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know -// Even with shared it will not work -double minMemNeeded = td.getTotalMemReqTask(exec); -if (minMemNeeded > memoryAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable); +Double resourceNeeded = ObjectReader.getDouble(resourceNeededEntry.getValue()); +Double resourceAvailable = ObjectReader.getDouble( +resourcesAvailable.getOrDefault(resourceName, null), 0.0); +if (resourceNeeded > resourceAvailable) { +if (true) { --- End diff -- I think this is a debug logging change. Do you want to change it back? ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147234354 --- Diff: storm-client/src/jvm/org/apache/storm/Config.java --- @@ -1272,6 +1278,12 @@ public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity"; /** + * A map of resources the Supervisor has e.g {"cpu" : 200.0. "memory.capacity.mb": 256.0, "gpu" : 0.5 } + */ +@isMapEntryType(keyType = String.class, valueType = Double.class) --- End diff -- Same comment here. Trying to force the type to be a double is going to be difficult with how we use the configs. ---
[GitHub] storm pull request #2385: STORM-2727: Generic Resource Aware Scheduling
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147236577 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -503,62 +553,70 @@ public boolean wouldFit( WorkerSlot ws, ExecutorDetails exec, TopologyDetails td, -double maxHeap, -double memoryAvailable, -double cpuAvailable) { -//NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot. -//CPU is simplest because it does not have odd interactions. -double cpuNeeded = td.getTotalCpuReqTask(exec); -if (cpuNeeded > cpuAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}", -td.getName(), -exec, -ws, -cpuNeeded, -cpuAvailable); +MapresourcesAvailable, +double maxHeap) { + +Map requestedResources = td.getTotalResources(exec); + +LOG.info(td.getName()); +LOG.info("requested"); +LOG.info(requestedResources.toString()); +LOG.info("available"); +LOG.info(resourcesAvailable.toString()); +LOG.info(ws.toString()); +for (Entry resourceNeededEntry : requestedResources.entrySet()) { +String resourceName = resourceNeededEntry.getKey().toString(); +if (resourceName == Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) { +continue; } -//Not enough CPU no need to try any more -return false; -} - -//Lets see if we can make the Memory one fast too, at least in the failure case. -//The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know -// Even with shared it will not work -double minMemNeeded = td.getTotalMemReqTask(exec); -if (minMemNeeded > memoryAvailable) { -if (LOG.isTraceEnabled()) { -LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable); +Double resourceNeeded = ObjectReader.getDouble(resourceNeededEntry.getValue()); +Double resourceAvailable = ObjectReader.getDouble( +resourcesAvailable.getOrDefault(resourceName, null), 0.0); +if (resourceNeeded > resourceAvailable) { +if (true) { +LOG.info("Could not schedule {}:{} on {} not enough {} {} > {}", +td.getName(), +exec, +ws, +resourceName, +resourceNeeded, +resourceAvailable); +} +//Not enough resources - stop trying +return false; } -//Not enough minimum MEM no need to try any more -return false; } double currentTotal = 0.0; double afterTotal = 0.0; double afterOnHeap = 0.0; + Set wouldBeAssigned = new HashSet<>(); wouldBeAssigned.add(exec); SchedulerAssignmentImpl assignment = assignments.get(td.getId()); + if (assignment != null) { Collection currentlyAssigned = assignment.getSlotToExecutors().get(ws); if (currentlyAssigned != null) { wouldBeAssigned.addAll(currentlyAssigned); WorkerResources wrCurrent = calculateWorkerResources(td, currentlyAssigned); currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap(); } -WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned); -afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap(); -afterOnHeap = wrAfter.get_mem_on_heap(); - -currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment); -afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec); } +WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned); +afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap(); +afterOnHeap = wrAfter.get_mem_on_heap(); + +currentTotal +=