Repository: storm Updated Branches: refs/heads/master 9a4ea8134 -> 8ea90b9d3
http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 5b78bd2..31dc992 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.nimbus; import java.io.File; @@ -97,6 +98,7 @@ import org.apache.storm.generated.NimbusSummary; import org.apache.storm.generated.NodeInfo; import org.apache.storm.generated.NotAliveException; import org.apache.storm.generated.NumErrorsChoice; +import org.apache.storm.generated.OwnerResourceSummary; import org.apache.storm.generated.ProfileAction; import org.apache.storm.generated.ProfileRequest; import org.apache.storm.generated.ReadableBlobMeta; @@ -139,6 +141,8 @@ import org.apache.storm.scheduler.SupervisorDetails; import org.apache.storm.scheduler.Topologies; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.multitenant.MultitenantScheduler; +import org.apache.storm.scheduler.resource.ResourceAwareScheduler; import org.apache.storm.scheduler.resource.ResourceUtils; import org.apache.storm.security.INimbusCredentialPlugin; import org.apache.storm.security.auth.AuthUtils; @@ -213,6 +217,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls"); private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls"); private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls"); + private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter("nimbus:num-getOwnerResourceSummaries-calls"); private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls"); // END Metrics @@ -1661,7 +1666,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { double sumOnHeap = 0.0; double sumOffHeap = 0.0; double sumCPU = 0.0; - + Assignment assignment = state.assignmentInfo(topoId, null); if (assignment != null) { if (assignment.is_set_worker_resources()) { @@ -3824,6 +3829,130 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { return false; } } + + @Override + public List<OwnerResourceSummary> getOwnerResourceSummaries(String owner) throws AuthorizationException, TException { + try { + getOwnerResourceSummariesCalls.mark(); + checkAuthorization(null, null, "getOwnerResourceSummaries"); + IStormClusterState state = stormClusterState; + Map<String, Assignment> topoIdToAssignments = state.topologyAssignments(); + Map<String, StormBase> topoIdToBases = state.topologyBases(); + Map<String, Object> clusterSchedulerConfig = scheduler.config(); + + //put [owner-> StormBase-list] mapping to ownerToBasesMap + //if this owner (the input parameter) is null, add all the owners with stormbase and guarantees + //else, add only this owner (the input paramter) to the map + Map<String, List<StormBase>> ownerToBasesMap = new HashMap<>(); + + if (owner == null){ + // add all the owners to the map + for (StormBase base: topoIdToBases.values()) { + String baseOwner = base.get_owner(); + if (!ownerToBasesMap.containsKey(baseOwner)) { + List<StormBase> stormbases = new ArrayList<>(); + stormbases.add(base); + ownerToBasesMap.put(baseOwner, stormbases); + } else { + ownerToBasesMap.get(baseOwner).add(base); + } + } + //in addition, add all the owners with guarantees + List<String> ownersWithGuarantees = new ArrayList<>(clusterSchedulerConfig.keySet()); + for (String ownerWithGuarantees: ownersWithGuarantees) { + if (!ownerToBasesMap.containsKey(ownerWithGuarantees)) { + ownerToBasesMap.put(ownerWithGuarantees, new ArrayList<>()); + } + } + } else { + //only put this owner to the map + List<StormBase> stormbases = new ArrayList<>(); + for (StormBase base: topoIdToBases.values()) { + if (owner.equals(base.get_owner())) { + stormbases.add(base); + } + } + ownerToBasesMap.put(owner, stormbases); + } + + List<OwnerResourceSummary> ret = new ArrayList<>(); + + //for each owner, get resources, configs, and aggregate + for (Entry<String, List<StormBase>> ownerToBasesEntry: ownerToBasesMap.entrySet()) { + String theOwner = ownerToBasesEntry.getKey(); + TopologyResources totalResourcesAggregate = new TopologyResources(0.0, 0.0, 0.0, 0.0, 0.0, 0.0); + + int totalExecutors = 0; + int totalWorkers = 0; + int totalTasks = 0; + + for (StormBase base: ownerToBasesEntry.getValue()) { + try { + String topoId = state.getTopoId(base.get_name()) + .orElseThrow(() -> new NotAliveException(base.get_name() + " is not alive")); + TopologyResources resources = getResourcesForTopology(topoId, base); + totalResourcesAggregate = totalResourcesAggregate.add(resources); + Assignment ownerAssignment = topoIdToAssignments.get(topoId); + if (ownerAssignment != null && ownerAssignment.get_executor_node_port() != null) { + totalExecutors += ownerAssignment.get_executor_node_port().keySet().size(); + totalWorkers += new HashSet(ownerAssignment.get_executor_node_port().values()).size(); + for (List<Long> executorId : ownerAssignment.get_executor_node_port().keySet()) { + totalTasks += StormCommon.executorIdToTasks(executorId).size(); + } + } + } catch (NotAliveException e) { + LOG.warn("{} is not alive.", base.get_name()); + } + } + + double requestedTotalMemory = totalResourcesAggregate.getRequestedMemOnHeap() + + totalResourcesAggregate.getRequestedMemOffHeap(); + double assignedTotalMemory = totalResourcesAggregate.getAssignedMemOnHeap() + + totalResourcesAggregate.getAssignedMemOffHeap(); + + OwnerResourceSummary ownerResourceSummary = new OwnerResourceSummary(theOwner); + ownerResourceSummary.set_total_topologies(ownerToBasesEntry.getValue().size()); + ownerResourceSummary.set_total_executors(totalExecutors); + ownerResourceSummary.set_total_workers(totalWorkers); + ownerResourceSummary.set_total_tasks(totalTasks); + ownerResourceSummary.set_memory_usage(assignedTotalMemory); + ownerResourceSummary.set_cpu_usage(totalResourcesAggregate.getAssignedCpu()); + ownerResourceSummary.set_requested_on_heap_memory(totalResourcesAggregate.getRequestedMemOnHeap()); + ownerResourceSummary.set_requested_off_heap_memory(totalResourcesAggregate.getRequestedMemOffHeap()); + ownerResourceSummary.set_requested_total_memory(requestedTotalMemory); + ownerResourceSummary.set_requested_cpu(totalResourcesAggregate.getRequestedCpu()); + ownerResourceSummary.set_assigned_on_heap_memory(totalResourcesAggregate.getAssignedMemOnHeap()); + ownerResourceSummary.set_assigned_off_heap_memory(totalResourcesAggregate.getAssignedMemOffHeap()); + + if (clusterSchedulerConfig.containsKey(theOwner)) { + if (scheduler instanceof ResourceAwareScheduler) { + Map<String, Object> schedulerConfig = (Map) clusterSchedulerConfig.get(theOwner); + if (schedulerConfig != null) { + ownerResourceSummary.set_memory_guarantee((double)schedulerConfig.getOrDefault("memory", 0)); + ownerResourceSummary.set_cpu_guarantee((double)schedulerConfig.getOrDefault("cpu", 0)); + ownerResourceSummary.set_memory_guarantee_remaining(ownerResourceSummary.get_memory_guarantee() + - ownerResourceSummary.get_memory_usage()); + ownerResourceSummary.set_cpu_guarantee_remaining(ownerResourceSummary.get_cpu_guarantee() + - ownerResourceSummary.get_cpu_usage()); + } + } else if (scheduler instanceof MultitenantScheduler) { + ownerResourceSummary.set_isolated_node_guarantee((int) clusterSchedulerConfig.getOrDefault(theOwner, 0)); + } + } + + LOG.debug("{}",ownerResourceSummary.toString()); + ret.add(ownerResourceSummary); + } + + return ret; + } catch (Exception e) { + LOG.warn("Get owner resource summaries exception. (owner = '{}')", owner); + if (e instanceof TException) { + throw (TException)e; + } + throw new RuntimeException(e); + } + } // Shutdownable methods http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java index 0b87052..97afa97 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java @@ -28,13 +28,12 @@ public final class TopologyResources { public TopologyResources(Double requestedMemOnHeap, Double requestedMemOffHeap, Double requestedCpu, Double assignedMemOnHeap, Double assignedMemOffHeap, Double assignedCpu) { - this.requestedMemOnHeap = requestedMemOnHeap; - this.requestedMemOffHeap = requestedMemOffHeap; - this.requestedCpu = requestedCpu; - this.assignedMemOnHeap = assignedMemOnHeap; - this.assignedMemOffHeap = assignedMemOffHeap; - this.assignedCpu = assignedCpu; - + this.requestedMemOnHeap = requestedMemOnHeap; + this.requestedMemOffHeap = requestedMemOffHeap; + this.requestedCpu = requestedCpu; + this.assignedMemOnHeap = assignedMemOnHeap; + this.assignedMemOffHeap = assignedMemOffHeap; + this.assignedCpu = assignedCpu; } public Double getRequestedMemOnHeap() { @@ -60,4 +59,13 @@ public final class TopologyResources { public Double getAssignedCpu() { return assignedCpu; } + + public TopologyResources add(TopologyResources other) { + return new TopologyResources(this.requestedMemOnHeap + other.requestedMemOnHeap, + this.requestedMemOffHeap + other.requestedMemOffHeap, + this.requestedCpu + other.requestedCpu, + this.assignedMemOnHeap + other.assignedMemOnHeap, + this.assignedMemOffHeap + other.assignedMemOffHeap, + this.assignedCpu + other.assignedCpu); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java index 1b1ea85..3908416 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java @@ -108,4 +108,8 @@ public class DefaultScheduler implements IScheduler { defaultSchedule(topologies, cluster); } + @Override + public Map<String, Object> config() { + return new HashMap<>(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java index 0a985ae..3ad2648 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java @@ -169,4 +169,9 @@ public class EvenScheduler implements IScheduler { scheduleTopologiesEvenly(topologies, cluster); } + @Override + public Map<String, Object> config() { + return new HashMap<>(); + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java index e821cf6..e10b8e8 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java @@ -56,6 +56,11 @@ public class IsolationScheduler implements IScheduler { Validate.notEmpty(isoMachines); } + @Override + public Map<String, Object> config() { + return new HashMap<>(); + } + // get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned) // will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors]) // match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time) http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java index 5bbf8de..e0f57cd 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java @@ -58,6 +58,11 @@ public class MultitenantScheduler implements IScheduler { return ret; } + + @Override + public Map<String, Object> config() { + return (Map) getUserConf(); + } @Override public void schedule(Topologies topologies, Cluster cluster) { http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java index 0b9e800..f92db8a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java @@ -59,6 +59,11 @@ public class ResourceAwareScheduler implements IScheduler { } @Override + public Map<String, Object> config() { + return (Map) getUserResourcePools(); + } + + @Override public void schedule(Topologies topologies, Cluster cluster) { LOG.debug("\n\n\nRerunning ResourceAwareScheduler..."); //initialize data structures