Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2385#discussion_r147236577 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java --- @@ -503,62 +553,70 @@ public boolean wouldFit( WorkerSlot ws, ExecutorDetails exec, TopologyDetails td, - double maxHeap, - double memoryAvailable, - double cpuAvailable) { - //NOTE this is called lots and lots by schedulers, so anything we can do to make it faster is going to help a lot. - //CPU is simplest because it does not have odd interactions. - double cpuNeeded = td.getTotalCpuReqTask(exec); - if (cpuNeeded > cpuAvailable) { - if (LOG.isTraceEnabled()) { - LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}", - td.getName(), - exec, - ws, - cpuNeeded, - cpuAvailable); + Map<String, Double> resourcesAvailable, + double maxHeap) { + + Map<String, Double> requestedResources = td.getTotalResources(exec); + + LOG.info(td.getName()); + LOG.info("requested"); + LOG.info(requestedResources.toString()); + LOG.info("available"); + LOG.info(resourcesAvailable.toString()); + LOG.info(ws.toString()); + for (Entry resourceNeededEntry : requestedResources.entrySet()) { + String resourceName = resourceNeededEntry.getKey().toString(); + if (resourceName == Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) { + continue; } - //Not enough CPU no need to try any more - return false; - } - - //Lets see if we can make the Memory one fast too, at least in the failure case. - //The totalMemReq is not really that accurate because it does not include shared memory, but if it does not fit we know - // Even with shared it will not work - double minMemNeeded = td.getTotalMemReqTask(exec); - if (minMemNeeded > memoryAvailable) { - if (LOG.isTraceEnabled()) { - LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable); + Double resourceNeeded = ObjectReader.getDouble(resourceNeededEntry.getValue()); + Double resourceAvailable = ObjectReader.getDouble( + resourcesAvailable.getOrDefault(resourceName, null), 0.0); + if (resourceNeeded > resourceAvailable) { + if (true) { + LOG.info("Could not schedule {}:{} on {} not enough {} {} > {}", + td.getName(), + exec, + ws, + resourceName, + resourceNeeded, + resourceAvailable); + } + //Not enough resources - stop trying + return false; } - //Not enough minimum MEM no need to try any more - return false; } double currentTotal = 0.0; double afterTotal = 0.0; double afterOnHeap = 0.0; + Set<ExecutorDetails> wouldBeAssigned = new HashSet<>(); wouldBeAssigned.add(exec); SchedulerAssignmentImpl assignment = assignments.get(td.getId()); + if (assignment != null) { Collection<ExecutorDetails> currentlyAssigned = assignment.getSlotToExecutors().get(ws); if (currentlyAssigned != null) { wouldBeAssigned.addAll(currentlyAssigned); WorkerResources wrCurrent = calculateWorkerResources(td, currentlyAssigned); currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap(); } - WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned); - afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap(); - afterOnHeap = wrAfter.get_mem_on_heap(); - - currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment); - afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec); } + WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned); + afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap(); + afterOnHeap = wrAfter.get_mem_on_heap(); + + currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment); + afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec); double memoryAdded = afterTotal - currentTotal; + double memoryAvailable = ObjectReader.getDouble( + resourcesAvailable.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, null), 0.0); + if (memoryAdded > memoryAvailable) { - if (LOG.isTraceEnabled()) { - LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}", + if (true) { + LOG.info("Could not schedule {}:{} on {} not enough Mem {} > {}", --- End diff -- There are several places like this where I think the logs need to go back to how they were before.
---