Repository: storm
Updated Branches:
  refs/heads/master 9a4ea8134 -> 8ea90b9d3


http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 5b78bd2..31dc992 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.nimbus;
 
 import java.io.File;
@@ -97,6 +98,7 @@ import org.apache.storm.generated.NimbusSummary;
 import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.generated.NotAliveException;
 import org.apache.storm.generated.NumErrorsChoice;
+import org.apache.storm.generated.OwnerResourceSummary;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.generated.ReadableBlobMeta;
@@ -139,6 +141,8 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.AuthUtils;
@@ -213,6 +217,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private static final Meter getTopologyPageInfoCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
     private static final Meter getSupervisorPageInfoCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
     private static final Meter getComponentPageInfoCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
+    private static final Meter getOwnerResourceSummariesCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-getOwnerResourceSummaries-calls");
     private static final Meter shutdownCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls");
     // END Metrics
     
@@ -1661,7 +1666,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 double sumOnHeap = 0.0;
                 double sumOffHeap = 0.0;
                 double sumCPU = 0.0;
-                
+
                 Assignment assignment = state.assignmentInfo(topoId, null);
                 if (assignment != null) {
                     if (assignment.is_set_worker_resources()) {
@@ -3824,6 +3829,130 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             return false;
         }
     }
+
+    @Override
+    public List<OwnerResourceSummary> getOwnerResourceSummaries(String owner) 
throws AuthorizationException, TException {
+        try {
+            getOwnerResourceSummariesCalls.mark();
+            checkAuthorization(null, null, "getOwnerResourceSummaries");
+            IStormClusterState state = stormClusterState;
+            Map<String, Assignment> topoIdToAssignments = 
state.topologyAssignments();
+            Map<String, StormBase> topoIdToBases = state.topologyBases();
+            Map<String, Object> clusterSchedulerConfig = scheduler.config();
+
+            //put [owner-> StormBase-list] mapping to ownerToBasesMap
+            //if this owner (the input parameter) is null, add all the owners 
with stormbase and guarantees
+            //else, add only this owner (the input paramter) to the map
+            Map<String, List<StormBase>> ownerToBasesMap = new HashMap<>();
+
+            if (owner == null){
+                // add all the owners to the map
+                for (StormBase base: topoIdToBases.values()) {
+                    String baseOwner = base.get_owner();
+                    if (!ownerToBasesMap.containsKey(baseOwner)) {
+                        List<StormBase> stormbases = new ArrayList<>();
+                        stormbases.add(base);
+                        ownerToBasesMap.put(baseOwner, stormbases);
+                    } else {
+                        ownerToBasesMap.get(baseOwner).add(base);
+                    }
+                }
+                //in addition, add all the owners with guarantees
+                List<String> ownersWithGuarantees = new 
ArrayList<>(clusterSchedulerConfig.keySet());
+                for (String ownerWithGuarantees: ownersWithGuarantees) {
+                    if (!ownerToBasesMap.containsKey(ownerWithGuarantees)) {
+                        ownerToBasesMap.put(ownerWithGuarantees, new 
ArrayList<>());
+                    }
+                }
+            } else {
+                //only put this owner to the map
+                List<StormBase> stormbases = new ArrayList<>();
+                for (StormBase base: topoIdToBases.values()) {
+                    if (owner.equals(base.get_owner())) {
+                        stormbases.add(base);
+                    }
+                }
+                ownerToBasesMap.put(owner, stormbases);
+            }
+
+            List<OwnerResourceSummary> ret = new ArrayList<>();
+
+            //for each owner, get resources, configs, and aggregate
+            for (Entry<String, List<StormBase>> ownerToBasesEntry: 
ownerToBasesMap.entrySet()) {
+                String theOwner = ownerToBasesEntry.getKey();
+                TopologyResources totalResourcesAggregate = new 
TopologyResources(0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
+
+                int totalExecutors = 0;
+                int totalWorkers = 0;
+                int totalTasks = 0;
+
+                for (StormBase base: ownerToBasesEntry.getValue()) {
+                    try {
+                        String topoId = state.getTopoId(base.get_name())
+                                .orElseThrow(() -> new 
NotAliveException(base.get_name() + " is not alive"));
+                        TopologyResources resources = 
getResourcesForTopology(topoId, base);
+                        totalResourcesAggregate = 
totalResourcesAggregate.add(resources);
+                        Assignment ownerAssignment = 
topoIdToAssignments.get(topoId);
+                        if (ownerAssignment != null && 
ownerAssignment.get_executor_node_port() != null) {
+                            totalExecutors += 
ownerAssignment.get_executor_node_port().keySet().size();
+                            totalWorkers += new 
HashSet(ownerAssignment.get_executor_node_port().values()).size();
+                            for (List<Long> executorId : 
ownerAssignment.get_executor_node_port().keySet()) {
+                                totalTasks += 
StormCommon.executorIdToTasks(executorId).size();
+                            }
+                        }
+                    } catch (NotAliveException e) {
+                        LOG.warn("{} is not alive.", base.get_name());
+                    }
+                }
+
+                double requestedTotalMemory = 
totalResourcesAggregate.getRequestedMemOnHeap()
+                        + totalResourcesAggregate.getRequestedMemOffHeap();
+                double assignedTotalMemory = 
totalResourcesAggregate.getAssignedMemOnHeap()
+                        + totalResourcesAggregate.getAssignedMemOffHeap();
+
+                OwnerResourceSummary ownerResourceSummary = new 
OwnerResourceSummary(theOwner);
+                
ownerResourceSummary.set_total_topologies(ownerToBasesEntry.getValue().size());
+                ownerResourceSummary.set_total_executors(totalExecutors);
+                ownerResourceSummary.set_total_workers(totalWorkers);
+                ownerResourceSummary.set_total_tasks(totalTasks);
+                ownerResourceSummary.set_memory_usage(assignedTotalMemory);
+                
ownerResourceSummary.set_cpu_usage(totalResourcesAggregate.getAssignedCpu());
+                
ownerResourceSummary.set_requested_on_heap_memory(totalResourcesAggregate.getRequestedMemOnHeap());
+                
ownerResourceSummary.set_requested_off_heap_memory(totalResourcesAggregate.getRequestedMemOffHeap());
+                
ownerResourceSummary.set_requested_total_memory(requestedTotalMemory);
+                
ownerResourceSummary.set_requested_cpu(totalResourcesAggregate.getRequestedCpu());
+                
ownerResourceSummary.set_assigned_on_heap_memory(totalResourcesAggregate.getAssignedMemOnHeap());
+                
ownerResourceSummary.set_assigned_off_heap_memory(totalResourcesAggregate.getAssignedMemOffHeap());
+
+                if (clusterSchedulerConfig.containsKey(theOwner)) {
+                    if (scheduler instanceof ResourceAwareScheduler) {
+                        Map<String, Object> schedulerConfig = (Map) 
clusterSchedulerConfig.get(theOwner);
+                        if (schedulerConfig != null) {
+                            
ownerResourceSummary.set_memory_guarantee((double)schedulerConfig.getOrDefault("memory",
 0));
+                            
ownerResourceSummary.set_cpu_guarantee((double)schedulerConfig.getOrDefault("cpu",
 0));
+                            
ownerResourceSummary.set_memory_guarantee_remaining(ownerResourceSummary.get_memory_guarantee()
+                                    - ownerResourceSummary.get_memory_usage());
+                            
ownerResourceSummary.set_cpu_guarantee_remaining(ownerResourceSummary.get_cpu_guarantee()
+                                    - ownerResourceSummary.get_cpu_usage());
+                        }
+                    } else if (scheduler instanceof  MultitenantScheduler) {
+                        ownerResourceSummary.set_isolated_node_guarantee((int) 
clusterSchedulerConfig.getOrDefault(theOwner, 0));
+                    }
+                }
+
+                LOG.debug("{}",ownerResourceSummary.toString());
+                ret.add(ownerResourceSummary);
+            }
+
+            return ret;
+        } catch (Exception e) {
+            LOG.warn("Get owner resource summaries exception. (owner = '{}')", 
owner);
+            if (e instanceof TException) {
+                throw (TException)e;
+            }
+            throw new RuntimeException(e);
+        }
+    }
     
     // Shutdownable methods
     

http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
index 0b87052..97afa97 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
@@ -28,13 +28,12 @@ public final class TopologyResources {
     public TopologyResources(Double requestedMemOnHeap, Double 
requestedMemOffHeap,
             Double requestedCpu, Double assignedMemOnHeap, Double 
assignedMemOffHeap,
             Double assignedCpu) {
-                this.requestedMemOnHeap = requestedMemOnHeap;
-                this.requestedMemOffHeap = requestedMemOffHeap;
-                this.requestedCpu = requestedCpu;
-                this.assignedMemOnHeap = assignedMemOnHeap;
-                this.assignedMemOffHeap = assignedMemOffHeap;
-                this.assignedCpu = assignedCpu;
-        
+        this.requestedMemOnHeap = requestedMemOnHeap;
+        this.requestedMemOffHeap = requestedMemOffHeap;
+        this.requestedCpu = requestedCpu;
+        this.assignedMemOnHeap = assignedMemOnHeap;
+        this.assignedMemOffHeap = assignedMemOffHeap;
+        this.assignedCpu = assignedCpu;
     }
 
     public Double getRequestedMemOnHeap() {
@@ -60,4 +59,13 @@ public final class TopologyResources {
     public Double getAssignedCpu() {
         return assignedCpu;
     }
+
+    public TopologyResources add(TopologyResources other) {
+        return new TopologyResources(this.requestedMemOnHeap + 
other.requestedMemOnHeap,
+                this.requestedMemOffHeap + other.requestedMemOffHeap,
+                this.requestedCpu + other.requestedCpu,
+                this.assignedMemOnHeap + other.assignedMemOnHeap,
+                this.assignedMemOffHeap + other.assignedMemOffHeap,
+                this.assignedCpu + other.assignedCpu);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
index 1b1ea85..3908416 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
@@ -108,4 +108,8 @@ public class DefaultScheduler implements IScheduler {
         defaultSchedule(topologies, cluster);
     }
 
+    @Override
+    public Map<String, Object> config() {
+        return new HashMap<>();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
index 0a985ae..3ad2648 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
@@ -169,4 +169,9 @@ public class EvenScheduler implements IScheduler {
         scheduleTopologiesEvenly(topologies, cluster);
     }
 
+    @Override
+    public Map<String, Object> config() {
+        return new HashMap<>();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
index e821cf6..e10b8e8 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
@@ -56,6 +56,11 @@ public class IsolationScheduler implements IScheduler {
         Validate.notEmpty(isoMachines);
     }
 
+    @Override
+    public Map<String, Object> config() {
+        return new HashMap<>();
+    }
+
     // get host -> all assignable worker slots for non-blacklisted machines 
(assigned or not assigned)
     // will then have a list of machines that need to be assigned (machine -> 
[topology, list of list of executors])
     // match each spec to a machine (who has the right number of workers), 
free everything else on that machine and assign those slots (do one topology at 
a time)

http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
index 5bbf8de..e0f57cd 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -58,6 +58,11 @@ public class MultitenantScheduler implements IScheduler {
     return ret;
   }
 
+
+  @Override
+  public Map<String, Object> config() {
+    return (Map) getUserConf();
+  }
  
   @Override
   public void schedule(Topologies topologies, Cluster cluster) {

http://git-wip-us.apache.org/repos/asf/storm/blob/e5d6d32b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 0b9e800..f92db8a 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -59,6 +59,11 @@ public class ResourceAwareScheduler implements IScheduler {
     }
 
     @Override
+    public Map<String, Object> config() {
+        return (Map) getUserResourcePools();
+    }
+
+    @Override
     public void schedule(Topologies topologies, Cluster cluster) {
         LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
         //initialize data structures

Reply via email to