Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2385#discussion_r147236577
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
    @@ -503,62 +553,70 @@ public boolean wouldFit(
             WorkerSlot ws,
             ExecutorDetails exec,
             TopologyDetails td,
    -        double maxHeap,
    -        double memoryAvailable,
    -        double cpuAvailable) {
    -        //NOTE this is called lots and lots by schedulers, so anything we 
can do to make it faster is going to help a lot.
    -        //CPU is simplest because it does not have odd interactions.
    -        double cpuNeeded = td.getTotalCpuReqTask(exec);
    -        if (cpuNeeded > cpuAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough CPU 
{} > {}",
    -                    td.getName(),
    -                    exec,
    -                    ws,
    -                    cpuNeeded,
    -                    cpuAvailable);
    +        Map<String, Double> resourcesAvailable,
    +        double maxHeap) {
    +
    +        Map<String, Double> requestedResources = 
td.getTotalResources(exec);
    +
    +        LOG.info(td.getName());
    +        LOG.info("requested");
    +        LOG.info(requestedResources.toString());
    +        LOG.info("available");
    +        LOG.info(resourcesAvailable.toString());
    +        LOG.info(ws.toString());
    +        for (Entry resourceNeededEntry : requestedResources.entrySet()) {
    +            String resourceName = resourceNeededEntry.getKey().toString();
    +            if (resourceName == 
Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName == 
Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
    +                continue;
                 }
    -            //Not enough CPU no need to try any more
    -            return false;
    -        }
    -
    -        //Lets see if we can make the Memory one fast too, at least in the 
failure case.
    -        //The totalMemReq is not really that accurate because it does not 
include shared memory, but if it does not fit we know
    -        // Even with shared it will not work
    -        double minMemNeeded = td.getTotalMemReqTask(exec);
    -        if (minMemNeeded > memoryAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough Mem 
{} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
    +            Double resourceNeeded = 
ObjectReader.getDouble(resourceNeededEntry.getValue());
    +            Double resourceAvailable = ObjectReader.getDouble(
    +                    resourcesAvailable.getOrDefault(resourceName, null), 
0.0);
    +            if (resourceNeeded > resourceAvailable) {
    +                if (true) {
    +                    LOG.info("Could not schedule {}:{} on {} not enough {} 
{} > {}",
    +                            td.getName(),
    +                            exec,
    +                            ws,
    +                            resourceName,
    +                            resourceNeeded,
    +                            resourceAvailable);
    +                }
    +                //Not enough resources - stop trying
    +                return false;
                 }
    -            //Not enough minimum MEM no need to try any more
    -            return false;
             }
     
             double currentTotal = 0.0;
             double afterTotal = 0.0;
             double afterOnHeap = 0.0;
    +
             Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
             wouldBeAssigned.add(exec);
             SchedulerAssignmentImpl assignment = assignments.get(td.getId());
    +
             if (assignment != null) {
                 Collection<ExecutorDetails> currentlyAssigned = 
assignment.getSlotToExecutors().get(ws);
                 if (currentlyAssigned != null) {
                     wouldBeAssigned.addAll(currentlyAssigned);
                     WorkerResources wrCurrent = calculateWorkerResources(td, 
currentlyAssigned);
                     currentTotal = wrCurrent.get_mem_off_heap() + 
wrCurrent.get_mem_on_heap();
                 }
    -            WorkerResources wrAfter = calculateWorkerResources(td, 
wouldBeAssigned);
    -            afterTotal = wrAfter.get_mem_off_heap() + 
wrAfter.get_mem_on_heap();
    -            afterOnHeap = wrAfter.get_mem_on_heap();
    -
    -            currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), 
assignment);
    -            afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), 
assignment, exec);
             }
    +        WorkerResources wrAfter = calculateWorkerResources(td, 
wouldBeAssigned);
    +        afterTotal = wrAfter.get_mem_off_heap() + 
wrAfter.get_mem_on_heap();
    +        afterOnHeap = wrAfter.get_mem_on_heap();
    +
    +        currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), 
assignment);
    +        afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), 
assignment, exec);
     
             double memoryAdded = afterTotal - currentTotal;
    +        double memoryAvailable = ObjectReader.getDouble(
    +                
resourcesAvailable.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 
null), 0.0);
    +
             if (memoryAdded > memoryAvailable) {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Could not schedule {}:{} on {} not enough Mem 
{} > {}",
    +            if (true) {
    +                LOG.info("Could not schedule {}:{} on {} not enough Mem {} 
> {}",
    --- End diff --
    
    There are several places like this where I think the logs need to go back 
to how they were before.


---

Reply via email to