Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2385#discussion_r147236577
--- Diff:
storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---
@@ -503,62 +553,70 @@ public boolean wouldFit(
WorkerSlot ws,
ExecutorDetails exec,
TopologyDetails td,
- double maxHeap,
- double memoryAvailable,
- double cpuAvailable) {
- //NOTE this is called lots and lots by schedulers, so anything we
can do to make it faster is going to help a lot.
- //CPU is simplest because it does not have odd interactions.
- double cpuNeeded = td.getTotalCpuReqTask(exec);
- if (cpuNeeded > cpuAvailable) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Could not schedule {}:{} on {} not enough CPU
{} > {}",
- td.getName(),
- exec,
- ws,
- cpuNeeded,
- cpuAvailable);
+ Map<String, Double> resourcesAvailable,
+ double maxHeap) {
+
+ Map<String, Double> requestedResources =
td.getTotalResources(exec);
+
+ LOG.info(td.getName());
+ LOG.info("requested");
+ LOG.info(requestedResources.toString());
+ LOG.info("available");
+ LOG.info(resourcesAvailable.toString());
+ LOG.info(ws.toString());
+ for (Entry resourceNeededEntry : requestedResources.entrySet()) {
+ String resourceName = resourceNeededEntry.getKey().toString();
+ if (resourceName ==
Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME || resourceName ==
Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) {
+ continue;
}
- //Not enough CPU no need to try any more
- return false;
- }
-
- //Lets see if we can make the Memory one fast too, at least in the
failure case.
- //The totalMemReq is not really that accurate because it does not
include shared memory, but if it does not fit we know
- // Even with shared it will not work
- double minMemNeeded = td.getTotalMemReqTask(exec);
- if (minMemNeeded > memoryAvailable) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Could not schedule {}:{} on {} not enough Mem
{} > {}", td.getName(), exec, ws, minMemNeeded, memoryAvailable);
+ Double resourceNeeded =
ObjectReader.getDouble(resourceNeededEntry.getValue());
+ Double resourceAvailable = ObjectReader.getDouble(
+ resourcesAvailable.getOrDefault(resourceName, null),
0.0);
+ if (resourceNeeded > resourceAvailable) {
+ if (true) {
+ LOG.info("Could not schedule {}:{} on {} not enough {}
{} > {}",
+ td.getName(),
+ exec,
+ ws,
+ resourceName,
+ resourceNeeded,
+ resourceAvailable);
+ }
+ //Not enough resources - stop trying
+ return false;
}
- //Not enough minimum MEM no need to try any more
- return false;
}
double currentTotal = 0.0;
double afterTotal = 0.0;
double afterOnHeap = 0.0;
+
Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
wouldBeAssigned.add(exec);
SchedulerAssignmentImpl assignment = assignments.get(td.getId());
+
if (assignment != null) {
Collection<ExecutorDetails> currentlyAssigned =
assignment.getSlotToExecutors().get(ws);
if (currentlyAssigned != null) {
wouldBeAssigned.addAll(currentlyAssigned);
WorkerResources wrCurrent = calculateWorkerResources(td,
currentlyAssigned);
currentTotal = wrCurrent.get_mem_off_heap() +
wrCurrent.get_mem_on_heap();
}
- WorkerResources wrAfter = calculateWorkerResources(td,
wouldBeAssigned);
- afterTotal = wrAfter.get_mem_off_heap() +
wrAfter.get_mem_on_heap();
- afterOnHeap = wrAfter.get_mem_on_heap();
-
- currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(),
assignment);
- afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(),
assignment, exec);
}
+ WorkerResources wrAfter = calculateWorkerResources(td,
wouldBeAssigned);
+ afterTotal = wrAfter.get_mem_off_heap() +
wrAfter.get_mem_on_heap();
+ afterOnHeap = wrAfter.get_mem_on_heap();
+
+ currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(),
assignment);
+ afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(),
assignment, exec);
double memoryAdded = afterTotal - currentTotal;
+ double memoryAvailable = ObjectReader.getDouble(
+
resourcesAvailable.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME,
null), 0.0);
+
if (memoryAdded > memoryAvailable) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Could not schedule {}:{} on {} not enough Mem
{} > {}",
+ if (true) {
+ LOG.info("Could not schedule {}:{} on {} not enough Mem {}
> {}",
--- End diff --
There are several places like this where I think the logs need to go back
to how they were before.
---