STORM-1994: Add table with per-topology and worker resource usage and 
components in (new) supervisor and topology pages


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f5ad9288
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f5ad9288
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f5ad9288

Branch: refs/heads/master
Commit: f5ad92888b6f1470e4d7f179fbe5ae73e37a07f7
Parents: 726f3ea
Author: Alessandro Bellina <[email protected]>
Authored: Wed Jul 6 14:23:18 2016 -0500
Committer: Alessandro Bellina <[email protected]>
Committed: Mon Jul 25 23:21:23 2016 -0500

----------------------------------------------------------------------
 docs/STORM-UI-REST-API.md                       |  121 +-
 docs/images/supervisor_page.png                 |  Bin 0 -> 133290 bytes
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  327 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |   94 +-
 .../jvm/org/apache/storm/generated/Nimbus.java  | 3486 ++++++++++++------
 .../storm/generated/SupervisorPageInfo.java     |  624 ++++
 .../storm/generated/TopologyPageInfo.java       |  284 +-
 .../apache/storm/generated/WorkerSummary.java   | 1880 ++++++++++
 .../jvm/org/apache/storm/scheduler/Cluster.java |  217 +-
 .../resource/ResourceAwareScheduler.java        |    9 +
 .../auth/authorizer/SimpleACLAuthorizer.java    |    7 +-
 .../jvm/org/apache/storm/stats/StatsUtil.java   |  132 +
 storm-core/src/py/storm/Nimbus-remote           |    7 +
 storm-core/src/py/storm/Nimbus.py               |  272 +-
 storm-core/src/py/storm/ttypes.py               | 1457 ++++++--
 storm-core/src/storm.thrift                     |   25 +
 storm-core/src/ui/public/component.html         |    8 +
 storm-core/src/ui/public/css/style.css          |   20 +
 storm-core/src/ui/public/js/script.js           |  191 +
 storm-core/src/ui/public/supervisor.html        |  132 +
 .../public/templates/index-page-template.html   |    4 +-
 .../templates/supervisor-page-template.html     |  145 +
 .../templates/topology-page-template.html       |  208 +-
 storm-core/src/ui/public/topology.html          |   12 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |   79 +-
 .../org/apache/storm/stats/TestStatsUtil.java   |  266 ++
 26 files changed, 8352 insertions(+), 1655 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/docs/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md
index a2fc39a..2a844a5 100644
--- a/docs/STORM-UI-REST-API.md
+++ b/docs/STORM-UI-REST-API.md
@@ -125,6 +125,7 @@ Response fields:
 |uptimeSeconds| Integer| Shows how long the supervisor is running in seconds|
 |slotsTotal| Integer| Total number of available worker slots for this 
supervisor|
 |slotsUsed| Integer| Number of worker slots used on this supervisor|
+|schedulerDisplayResource| Boolean | Whether to display scheduler resource 
information|
 |totalMem| Double| Total memory capacity on this supervisor|
 |totalCpu| Double| Total CPU capacity on this supervisor|
 |usedMem| Double| Used memory capacity on this supervisor|
@@ -207,6 +208,123 @@ Sample response:
 }
 ```
 
+### /api/v1/supervisor (GET)
+
+Returns summary for a supervisor by id, or all supervisors running on a host.
+
+Examples:
+
+```no-highlight
+ 1. By host: 
http://ui-daemon-host-name:8080/api/v1/supervisor?host=supervisor-daemon-host-name
+ 2. By id: 
http://ui-daemon-host-name:8080/api/v1/supervisor?id=f5449110-1daa-43e2-89e3-69917b16dec9-192.168.1.1
+```
+
+Request parameters:
+
+|Parameter |Value   |Description  |
+|----------|--------|-------------|
+|id       |String. Supervisor id | If specified, respond with the supervisor 
and worker stats with id. Note that when id is specified, the host argument is 
ignored. |
+|host      |String. Host name| If specified, respond with all supervisors and 
worker stats in the host (normally just one)|
+|sys       |String. Values 1 or 0. Default value 0| Controls including sys 
stats part of the response|
+
+Response fields:
+
+|Field  |Value|Description|
+|---   |---    |---
+|supervisors| Array| Array of supervisor summaries|
+|workers| Array| Array of worker summaries |
+|schedulerDisplayResource| Boolean | Whether to display scheduler resource 
information|
+
+Each supervisor is defined by:
+
+|Field  |Value|Description|
+|---   |---    |---
+|id| String | Supervisor's id|
+|host| String| Supervisor's host name|
+|uptime| String| Shows how long the supervisor is running|
+|uptimeSeconds| Integer| Shows how long the supervisor is running in seconds|
+|slotsTotal| Integer| Total number of worker slots for this supervisor|
+|slotsUsed| Integer| Number of worker slots used on this supervisor|
+|totalMem| Double| Total memory capacity on this supervisor|
+|totalCpu| Double| Total CPU capacity on this supervisor|
+|usedMem| Double| Used memory capacity on this supervisor|
+|usedCpu| Double| Used CPU capacity on this supervisor|
+
+Each worker is defined by:
+
+|Field  |Value  |Description|
+|-------|-------|-----------|
+|supervisorId | String| Supervisor's id|
+|host | String | Worker's host name|
+|port | Integer | Worker's port|
+|topologyId | String | Topology Id|
+|topologyName | String | Topology Name|
+|executorsTotal | Integer | Number of executors used by the topology in this 
worker|
+|assignedMemOnHeap | Double | Assigned On-Heap Memory by Scheduler (MB)|
+|assignedMemOffHeap | Double | Assigned Off-Heap Memory by Scheduler (MB)|
+|assignedCpu | Number | Assigned CPU by Scheduler (%)| 
+|componentNumTasks | Dictionary | Components -> # of executing tasks|
+|uptime| String| Shows how long the worker is running|
+|uptimeSeconds| Integer| Shows how long the worker is running in seconds|
+|workerLogLink | String | Link to worker log viewer page|
+
+Sample response:
+
+```json
+{
+    "supervisors": [{ 
+        "totalMem": 4096.0, 
+        "host":"192.168.10.237",
+        "id":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+        "uptime":"7m 8s",
+        "totalCpu":400.0,
+        "usedCpu":495.0,
+        "usedMem":3432.0,
+        "slotsUsed":2,
+        "version":"0.10.1",
+        "slotsTotal":4,
+        "uptimeSeconds":428
+    }],
+    "schedulerDisplayResource":true,
+    "workers":[{
+        "topologyName":"ras",
+        "topologyId":"ras-4-1460229987",
+        "host":"192.168.10.237",
+        "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+        "assignedMemOnHeap":704.0,
+        "uptime":"2m 47s",
+        "uptimeSeconds":167,
+        "port":6707,
+        
"workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6707%2Fworker.log",
+        "componentNumTasks": {
+            "word":5
+        },
+        "executorsTotal":8,
+        "assignedCpu":130.0,
+        "assignedMemOffHeap":80.0
+    },
+    {
+        "topologyName":"ras",
+        "topologyId":"ras-4-1460229987",
+        "host":"192.168.10.237",
+        "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212",
+        "assignedMemOnHeap":904.0,
+        "uptime":"2m 53s",
+        "port":6706,
+        
"workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6706%2Fworker.log",
+        "componentNumTasks":{
+            "exclaim2":2,
+            "exclaim1":3,
+            "word":5
+        },
+        "executorsTotal":10,
+        "uptimeSeconds":173,
+        "assignedCpu":165.0,
+        "assignedMemOffHeap":80.0
+    }]
+}
+```
+
 ### /api/v1/topology/summary (GET)
 
 Returns summary information for all topologies.
@@ -232,6 +350,7 @@ Response fields:
 |assignedMemOffHeap| Double|Assigned Off-Heap Memory by Scheduler (MB)|
 |assignedTotalMem| Double|Assigned Total Memory by Scheduler (MB)|
 |assignedCpu| Double|Assigned CPU by Scheduler (%)|
+|schedulerDisplayResource| Boolean | Whether to display scheduler resource 
information|
 
 Sample response:
 
@@ -257,7 +376,7 @@ Sample response:
             "assignedTotalMem": 768,
             "assignedCpu": 80
         }
-    ]
+    ],
     "schedulerDisplayResource": true
 }
 ```

http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/docs/images/supervisor_page.png
----------------------------------------------------------------------
diff --git a/docs/images/supervisor_page.png b/docs/images/supervisor_page.png
new file mode 100644
index 0000000..5133681
Binary files /dev/null and b/docs/images/supervisor_page.png differ

http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj 
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 93507ff..34cb7e0 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -48,7 +48,7 @@
             KillOptions RebalanceOptions ClusterSummary SupervisorSummary 
TopologySummary TopologyInfo TopologyHistoryInfo
             ExecutorSummary AuthorizationException GetInfoOptions 
NumErrorsChoice SettableBlobMeta ReadableBlobMeta
             BeginDownloadResult ListBlobsResult ComponentPageInfo 
TopologyPageInfo LogConfig LogLevel LogLevelAction
-            ProfileRequest ProfileAction NodeInfo LSTopoHistory])
+            ProfileRequest ProfileAction NodeInfo LSTopoHistory 
SupervisorPageInfo WorkerSummary WorkerResources])
   (:import [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon])
   (:import [org.apache.storm.validation ConfigValidation])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType 
StormClusterStateImpl ClusterUtils])
@@ -98,6 +98,7 @@
 (def nimbus:num-getTopologyInfoWithOpts-calls 
(StormMetricsRegistry/registerMeter "nimbus:num-getTopologyInfoWithOpts-calls"))
 (def nimbus:num-getTopologyInfo-calls (StormMetricsRegistry/registerMeter 
"nimbus:num-getTopologyInfo-calls"))
 (def nimbus:num-getTopologyPageInfo-calls (StormMetricsRegistry/registerMeter 
"nimbus:num-getTopologyPageInfo-calls"))
+(def nimbus:num-getSupervisorPageInfo-calls 
(StormMetricsRegistry/registerMeter "nimbus:num-getSupervisorPageInfo-calls"))
 (def nimbus:num-getComponentPageInfo-calls (StormMetricsRegistry/registerMeter 
"nimbus:num-getComponentPageInfo-calls"))
 (def nimbus:num-shutdown-calls (StormMetricsRegistry/registerMeter 
"nimbus:num-shutdown-calls"))
 
@@ -216,6 +217,7 @@
      :id->sched-status (atom {})
      :node-id->resources (atom {}) ;;resources of supervisors
      :id->resources (atom {}) ;;resources of topologies
+     :id->worker-resources (atom {}) ; resources of workers per topology
      :cred-renewers (AuthUtils/GetCredentialRenewers conf)
      :topology-history-lock (Object.)
      :topo-history-state (ConfigUtils/nimbusTopoHistoryState conf)
@@ -433,7 +435,8 @@
       {})
     ))
 
-(defn- all-supervisor-info
+;; public for testing
+(defn all-supervisor-info
   ([storm-cluster-state] (all-supervisor-info storm-cluster-state nil))
   ([storm-cluster-state callback]
      (let [supervisor-ids (.supervisors storm-cluster-state callback)]
@@ -441,10 +444,8 @@
              (mapcat
               (fn [id]
                 (if-let [info (clojurify-supervisor-info (.supervisorInfo 
storm-cluster-state id))]
-                  [[id info]]
-                  ))
-              supervisor-ids))
-       )))
+                  [[id info]]))
+              supervisor-ids)))))
 
 (defn- all-scheduling-slots
   [nimbus topologies missing-assignment-topologies]
@@ -623,6 +624,10 @@
 (defn- to-executor-id [task-ids]
   [(first task-ids) (last task-ids)])
 
+;; convenience method for unit test
+(defn get-clojurified-task-info [topology storm-conf]
+  (clojurify-structure (StormCommon/stormTaskInfo topology storm-conf)))
+
 ;TODO: when translating this function, you should replace the map-val with a 
proper for loop HERE
 (defn- compute-executors [nimbus storm-id]
   (let [conf (:conf nimbus)
@@ -631,7 +636,7 @@
         component->executors (:component->executors storm-base)
         storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
         topology (read-storm-topology-as-nimbus storm-id blob-store)
-        task->component (clojurify-structure(StormCommon/stormTaskInfo 
topology storm-conf))]
+        task->component (get-clojurified-task-info topology storm-conf)]
     (->> (StormCommon/stormTaskInfo topology storm-conf)
          (Utils/reverseMap)
          clojurify-structure
@@ -649,7 +654,7 @@
         executors (compute-executors nimbus storm-id)
         topology (read-storm-topology-as-nimbus storm-id blob-store)
         storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
-        task->component (clojurify-structure (StormCommon/stormTaskInfo 
topology storm-conf))
+        task->component (get-clojurified-task-info topology storm-conf)
         executor->component (into {} (for [executor executors
                                            :let [start-task (first executor)
                                                  component (task->component 
start-task)]]
@@ -719,8 +724,7 @@
                                                     all-ports (-> (get 
all-scheduling-slots sid)
                                                                   
(set/difference dead-ports)
                                                                   ((fn [ports] 
(map int ports))))
-                                                    supervisor-details 
(SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map 
supervisor-info))
-                                                    ]]
+                                                    supervisor-details 
(SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map 
supervisor-info))]]
                                           {sid supervisor-details}))]
     (merge all-supervisor-details
            (into {}
@@ -801,6 +805,9 @@
 
     new-topology->executor->node+port))
 
+(defrecord TopologyResources [requested-mem-on-heap requested-mem-off-heap 
requested-cpu
+                              assigned-mem-on-heap assigned-mem-off-heap 
assigned-cpu])
+ 
 ;; public so it can be mocked out
 (defn compute-new-scheduler-assignments [nimbus existing-assignments 
topologies scratch-topology-id]
   (let [conf (:conf nimbus)
@@ -838,17 +845,33 @@
                                   (apply merge-with set/union))
 
         supervisors (read-all-supervisor-details nimbus all-scheduling-slots 
supervisor->dead-ports)
-        cluster (Cluster. (:inimbus nimbus) supervisors 
topology->scheduler-assignment conf)
-        _ (.setStatusMap cluster (deref (:id->sched-status nimbus)))
-        ;; call scheduler.schedule to schedule all the topologies
-        ;; the new assignments for all the topologies are in the cluster 
object.
-        _ (.schedule (:scheduler nimbus) topologies cluster)
-        _ (.setTopologyResourcesMap cluster @(:id->resources nimbus))
-        _ (if-not (conf SCHEDULER-DISPLAY-RESOURCE) 
(.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
-        ;;merge with existing statuses
-        _ (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status 
nimbus)) (.getStatusMap cluster)))
-        _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap 
cluster))
-        _ (reset! (:id->resources nimbus) (.getTopologyResourcesMap cluster))]
+        cluster (Cluster. (:inimbus nimbus) supervisors 
topology->scheduler-assignment conf)]
+
+    ;; set the status map with existing topology statuses
+    (.setStatusMap cluster (deref (:id->sched-status nimbus)))
+    ;; call scheduler.schedule to schedule all the topologies
+    ;; the new assignments for all the topologies are in the cluster object.
+    (.schedule (:scheduler nimbus) topologies cluster)
+
+    ;;merge with existing statuses
+    (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status 
nimbus)) (.getStatusMap cluster)))
+    (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
+
+    (if-not (conf SCHEDULER-DISPLAY-RESOURCE) 
+      (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
+
+    ; Remove both of swaps below at first opportunity. This is a hack for 
non-ras scheduler topology and worker resources.
+    (swap! (:id->resources nimbus) merge (into {} (map (fn [[k v]] [k 
(->TopologyResources (nth v 0) (nth v 1) (nth v 2)
+                                                                               
            (nth v 3) (nth v 4) (nth v 5))])
+                                                       
(.getTopologyResourcesMap cluster))))
+    ; Remove this also at first chance
+    (swap! (:id->worker-resources nimbus) merge 
+           (into {} (map (fn [[k v]] [k (map-val #(doto (WorkerResources.)
+                                                        (.set_mem_on_heap (nth 
% 0))
+                                                        (.set_mem_off_heap 
(nth % 1))
+                                                        (.set_cpu (nth % 2))) 
v)])
+                         (.getWorkerResourcesMap cluster))))
+
     (.getAssignments cluster)))
 
 (defn- map-diff
@@ -856,6 +879,48 @@
   [m1 m2]
   (into {} (filter (fn [[k v]] (not= v (m1 k))) m2)))
 
+(defn get-resources-for-topology [nimbus topo-id]
+  (or (get @(:id->resources nimbus) topo-id)
+      (try
+        (let [storm-cluster-state (:storm-cluster-state nimbus)
+              topology-details (read-topology-details nimbus topo-id)
+              assigned-resources (->> (clojurify-assignment (.assignmentInfo 
storm-cluster-state topo-id nil))
+                                      :worker->resources
+                                      (vals)
+                                        ; Default to [[0 0 0]] if there are no 
values
+                                      (#(or % [[0 0 0]]))
+                                        ; [[on-heap, off-heap, cpu]] -> 
[[on-heap], [off-heap], [cpu]]
+                                      (apply map vector)
+                                        ; [[on-heap], [off-heap], [cpu]] -> 
[on-heap-sum, off-heap-sum, cpu-sum]
+                                      (map (partial reduce +)))
+              worker-resources (->TopologyResources 
(.getTotalRequestedMemOnHeap topology-details)
+                                                    
(.getTotalRequestedMemOffHeap topology-details)
+                                                    (.getTotalRequestedCpu 
topology-details)
+                                                    (nth assigned-resources 0)
+                                                    (nth assigned-resources 1)
+                                                    (nth assigned-resources 
2))]
+          (swap! (:id->resources nimbus) assoc topo-id worker-resources)
+          worker-resources)
+        (catch KeyNotFoundException e
+          ; This can happen when a topology is first coming up.
+          ; It's thrown by the blobstore code.
+          (log-error e "Failed to get topology details")
+          (->TopologyResources 0 0 0 0 0 0)))))
+
+(defn- get-worker-resources-for-topology [nimbus topo-id]
+  (or (get @(:id->worker-resources nimbus) topo-id)
+      (try
+        (let [storm-cluster-state (:storm-cluster-state nimbus)
+              assigned-resources (->> (clojurify-assignment (.assignmentInfo 
storm-cluster-state topo-id nil))
+                                      :worker->resources)
+              worker-resources (into {} (map #(identity {(WorkerSlot. (first 
(key %)) (second (key %)))  
+                                                         (doto 
(WorkerResources.)
+                                                             (.set_mem_on_heap 
(nth (val %) 0))
+                                                             
(.set_mem_off_heap (nth (val %) 1))
+                                                             (.set_cpu (nth 
(val %) 2)))}) assigned-resources))]
+          (swap! (:id->worker-resources nimbus) assoc topo-id worker-resources)
+          worker-resources))))
+          
 (defn changed-executors [executor->node+port new-executor->node+port]
   (let [executor->node+port (if executor->node+port (sort executor->node+port) 
nil)
         new-executor->node+port (if new-executor->node+port (sort 
new-executor->node+port) nil)
@@ -947,6 +1012,10 @@
                                                  start-times
                                                  worker->resources)}))]
 
+    (when (not= new-assignments existing-assignments)
+      (log-debug "RESETTING id->resources and id->worker-resources cache!")
+      (reset! (:id->resources nimbus) {})
+      (reset! (:id->worker-resources nimbus) {}))
     ;; tasks figure out what tasks to talk to by looking at topology at runtime
     ;; only log/set when there's been a change to the assignment
     (doseq [[topology-id assignment] new-assignments
@@ -1015,6 +1084,18 @@
       (throw (AlreadyAliveException. (str storm-name " is already active"))))
     ))
 
+(defn try-read-storm-conf [conf storm-id blob-store]
+  (try-cause
+    (read-storm-conf-as-nimbus conf storm-id blob-store)
+    (catch KeyNotFoundException e
+       (throw (NotAliveException. (str storm-id))))))
+
+(defn try-read-storm-conf-from-name [conf storm-name nimbus]
+  (let [storm-cluster-state (:storm-cluster-state nimbus)
+        blob-store (:blob-store nimbus)
+        id (StormCommon/getStormId storm-cluster-state storm-name)]
+   (try-read-storm-conf conf id blob-store)))
+
 (defn check-authorization!
   ([nimbus storm-name storm-conf operation context]
      (let [aclHandler (:authorization-handler nimbus)
@@ -1040,6 +1121,15 @@
   ([nimbus storm-name storm-conf operation]
      (check-authorization! nimbus storm-name storm-conf operation 
(ReqContext/context))))
 
+;; no-throw version of check-authorization!
+(defn is-authorized?
+  [nimbus conf blob-store operation topology-id]
+  (let [topology-conf (try-read-storm-conf conf topology-id blob-store)
+        storm-name (topology-conf TOPOLOGY-NAME)]
+    (try (check-authorization! nimbus storm-name topology-conf operation)
+         true
+      (catch AuthorizationException e false))))
+
 (defn code-ids [blob-store]
   (let [to-id (reify KeyFilter
                 (filter [this key] (ConfigUtils/getIdFromBlobKey key)))]
@@ -1352,25 +1442,56 @@
 (defmethod blob-sync :local [conf nimbus]
   nil)
 
+(defn make-supervisor-summary 
+  [nimbus id info]
+    (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+          sup-sum (SupervisorSummary. (:hostname info)
+                                      (:uptime-secs info)
+                                      (count ports)
+                                      (count (:used-ports info))
+                                      id)]
+      (.set_total_resources sup-sum (map-val double (:resources-map info)))
+      (when-let [[total-mem total-cpu used-mem used-cpu] (.get 
@(:node-id->resources nimbus) id)]
+        (.set_used_mem sup-sum (Utils/nullToZero used-mem))
+        (.set_used_cpu sup-sum (Utils/nullToZero used-cpu)))
+      (when-let [version (:version info)] (.set_version sup-sum version))
+      sup-sum))
+
+(defn user-and-supervisor-topos
+  [nimbus conf blob-store assignments supervisor-id]
+  (let [topo-id->supervisors 
+          (into {} (for [[topo-id java-assignment] assignments] 
+                     (let [assignment (clojurify-assignment java-assignment)]
+                       {topo-id (into #{} 
+                                      (map #(first (second %)) 
+                                           (:executor->node+port 
assignment)))})))
+        supervisor-topologies (keys (filter #(get (val %) supervisor-id) 
topo-id->supervisors))]
+    {:supervisor-topologies supervisor-topologies
+     :user-topologies (into #{} (filter (partial is-authorized? nimbus 
+                                                 conf 
+                                                 blob-store 
+                                                 "getTopology") 
+                  supervisor-topologies))}))
+
+(defn topology-assignments 
+  [storm-cluster-state]
+  (let [assigned-topology-ids (.assignments storm-cluster-state nil)]
+    (into {} (for [tid assigned-topology-ids]
+               {tid (.assignmentInfo storm-cluster-state tid nil)}))))
+
+(defn get-launch-time-secs 
+  [base storm-id]
+  (if base (:launch-time-secs base)
+    (throw
+      (NotAliveException. (str storm-id)))))
+
 (defn get-cluster-info [nimbus]
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         supervisor-infos (all-supervisor-info storm-cluster-state)
         ;; TODO: need to get the port info about supervisors...
         ;; in standalone just look at metadata, otherwise just say N/A?
         supervisor-summaries (dofor [[id info] supervisor-infos]
-                                    (let [ports (set (:meta info)) ;;TODO: 
this is only true for standalone
-                                          sup-sum (SupervisorSummary. 
(:hostname info)
-                                                                      
(:uptime-secs info)
-                                                                      (count 
ports)
-                                                                      (count 
(:used-ports info))
-                                                                      id) ]
-                                      ;TODO: when translating this function, 
you should replace the map-val with a proper for loop HERE
-                                      (.set_total_resources sup-sum (map-val 
double (:resources-map info)))
-                                      (when-let [[total-mem total-cpu used-mem 
used-cpu] (.get @(:node-id->resources nimbus) id)]
-                                        (.set_used_mem sup-sum used-mem)
-                                        (.set_used_cpu sup-sum used-cpu))
-                                      (when-let [version (:version info)] 
(.set_version sup-sum version))
-                                      sup-sum))
+                                    (make-supervisor-summary nimbus id info))
         nimbus-uptime (. (:uptime nimbus) upTime)
         bases (nimbus-topology-bases storm-cluster-state)
         nimbuses (.nimbuses storm-cluster-state)
@@ -1402,13 +1523,13 @@
                                                                     
(extract-status-str base))]
                                     (when-let [owner (:owner base)] 
(.set_owner topo-summ owner))
                                     (when-let [sched-status (.get 
@(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status))
-                                    (when-let [resources (.get 
@(:id->resources nimbus) id)]
-                                      (.set_requested_memonheap topo-summ (get 
resources 0))
-                                      (.set_requested_memoffheap topo-summ 
(get resources 1))
-                                      (.set_requested_cpu topo-summ (get 
resources 2))
-                                      (.set_assigned_memonheap topo-summ (get 
resources 3))
-                                      (.set_assigned_memoffheap topo-summ (get 
resources 4))
-                                      (.set_assigned_cpu topo-summ (get 
resources 5)))
+                                    (when-let [resources 
(get-resources-for-topology nimbus id)]
+                                      (.set_requested_memonheap topo-summ 
(:requested-mem-on-heap resources))
+                                      (.set_requested_memoffheap topo-summ 
(:requested-mem-off-heap resources))
+                                      (.set_requested_cpu topo-summ 
(:requested-cpu resources))
+                                      (.set_assigned_memonheap topo-summ 
(:assigned-mem-on-heap resources))
+                                      (.set_assigned_memoffheap topo-summ 
(:assigned-mem-off-heap resources))
+                                      (.set_assigned_cpu topo-summ 
(:assigned-cpu resources)))
                                     (.set_replication_count topo-summ 
(get-blob-replication-count (ConfigUtils/masterStormCodeKey id) nimbus))
                                     topo-summ))
         ret (ClusterSummary. supervisor-summaries
@@ -1470,11 +1591,9 @@
                                           topology-conf
                                           operation)
                   topology (try-read-storm-topology storm-id blob-store)
-                  task->component (clojurify-structure 
(StormCommon/stormTaskInfo topology topology-conf))
+                  task->component (get-clojurified-task-info topology 
topology-conf)
                   base (clojurify-storm-base (.stormBase storm-cluster-state 
storm-id nil))
-                  launch-time-secs (if base (:launch-time-secs base)
-                                     (throw
-                                       (NotAliveException. (str storm-id))))
+                  launch-time-secs (get-launch-time-secs base storm-id)
                   assignment (clojurify-assignment (.assignmentInfo 
storm-cluster-state storm-id nil))
                   beats (get @(:heartbeats-cache nimbus) storm-id)
                   all-components (set (vals task->component))]
@@ -1883,13 +2002,13 @@
                            )]
             (when-let [owner (:owner base)] (.set_owner topo-info owner))
             (when-let [sched-status (.get @(:id->sched-status nimbus) 
storm-id)] (.set_sched_status topo-info sched-status))
-            (when-let [resources (.get @(:id->resources nimbus) storm-id)]
-              (.set_requested_memonheap topo-info (get resources 0))
-              (.set_requested_memoffheap topo-info (get resources 1))
-              (.set_requested_cpu topo-info (get resources 2))
-              (.set_assigned_memonheap topo-info (get resources 3))
-              (.set_assigned_memoffheap topo-info (get resources 4))
-              (.set_assigned_cpu topo-info (get resources 5)))
+            (when-let [resources (get-resources-for-topology nimbus storm-id)]
+              (.set_requested_memonheap topo-info (:requested-mem-on-heap 
resources))
+              (.set_requested_memoffheap topo-info (:requested-mem-off-heap 
resources))
+              (.set_requested_cpu topo-info (:requested-cpu resources))
+              (.set_assigned_memonheap topo-info (:assigned-mem-on-heap 
resources))
+              (.set_assigned_memoffheap topo-info (:assigned-mem-off-heap 
resources))
+              (.set_assigned_cpu topo-info (:assigned-cpu resources)))
             (when-let [component->debug (:component->debug base)]
               ;TODO: when translating this function, you should replace the 
map-val with a proper for loop HERE
               (.set_component_debug topo-info (map-val 
converter/thriftify-debugoptions component->debug)))
@@ -1902,6 +2021,7 @@
                                   topology-id
                                   (doto (GetInfoOptions.) (.set_num_err_choice 
NumErrorsChoice/ALL))))
 
+
       (^String beginCreateBlob [this
                                 ^String blob-key
                                 ^SettableBlobMeta blob-meta]
@@ -2053,44 +2173,109 @@
       (^TopologyPageInfo getTopologyPageInfo
         [this ^String topo-id ^String window ^boolean include-sys?]
         (.mark nimbus:num-getTopologyPageInfo-calls)
-        (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
-
-              exec->node+port (:executor->node+port (:assignment info))
+        (let [topo-info (get-common-topo-info topo-id "getTopologyPageInfo")
+              {:keys [storm-name
+                      storm-cluster-state
+                      launch-time-secs
+                      assignment
+                      beats
+                      task->component
+                      topology
+                      base]} topo-info
+              exec->node+port (:executor->node+port assignment)
+              node->host (:node->host assignment)
+              worker->resources (get-worker-resources-for-topology nimbus 
topo-id)
+              worker-summaries (StatsUtil/aggWorkerStats topo-id 
+                                                         storm-name
+                                                         task->component
+                                                         beats
+                                                         exec->node+port
+                                                         node->host
+                                                         worker->resources
+                                                         include-sys?
+                                                         true)  ;; this is the 
topology page, so we know the user is authorized 
               topo-page-info (StatsUtil/aggTopoExecsStats topo-id
                                                           exec->node+port
-                                                          (:task->component 
info)
-                                                          (:beats info)
-                                                          (:topology info)
+                                                          task->component
+                                                          beats
+                                                          topology
                                                           window
                                                           include-sys?
-                                                          
(:storm-cluster-state info))]
-          (when-let [owner (:owner (:base info))]
+                                                          storm-cluster-state)]
+          (.set_workers topo-page-info worker-summaries)
+          (when-let [owner (:owner base)]
             (.set_owner topo-page-info owner))
           (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
             (.set_sched_status topo-page-info sched-status))
-          (when-let [resources (.get @(:id->resources nimbus) topo-id)]
-            (.set_requested_memonheap topo-page-info (get resources 0))
-            (.set_requested_memoffheap topo-page-info (get resources 1))
-            (.set_requested_cpu topo-page-info (get resources 2))
-            (.set_assigned_memonheap topo-page-info (get resources 3))
-            (.set_assigned_memoffheap topo-page-info (get resources 4))
-            (.set_assigned_cpu topo-page-info (get resources 5)))
+          (when-let [resources (get-resources-for-topology nimbus topo-id)]
+            (.set_requested_memonheap topo-page-info (:requested-mem-on-heap 
resources))
+            (.set_requested_memoffheap topo-page-info (:requested-mem-off-heap 
resources))
+            (.set_requested_cpu topo-page-info (:requested-cpu resources))
+            (.set_assigned_memonheap topo-page-info (:assigned-mem-on-heap 
resources))
+            (.set_assigned_memoffheap topo-page-info (:assigned-mem-off-heap 
resources))
+            (.set_assigned_cpu topo-page-info (:assigned-cpu resources)))
           (doto topo-page-info
-            (.set_name (:storm-name info))
-            (.set_status (extract-status-str (:base info)))
-            (.set_uptime_secs (Time/deltaSecs (:launch-time-secs info)))
+            (.set_name storm-name)
+            (.set_status (extract-status-str base))
+            (.set_uptime_secs (Time/deltaSecs launch-time-secs))
             (.set_topology_conf (JSONValue/toJSONString
                                   (try-read-storm-conf conf
                                                        topo-id
                                                        (:blob-store nimbus))))
             (.set_replication_count (get-blob-replication-count 
(ConfigUtils/masterStormCodeKey topo-id) nimbus)))
           (when-let [debug-options
-                     (get-in info [:base :component->debug topo-id])]
+                     (get-in topo-info [:base :component->debug topo-id])]
             (.set_debug_options
               topo-page-info
               (converter/thriftify-debugoptions debug-options)))
           topo-page-info))
 
+      (^SupervisorPageInfo getSupervisorPageInfo
+        [this
+         ^String supervisor-id
+         ^String host 
+         ^boolean include-sys?]
+        (.mark nimbus:num-getSupervisorPageInfo-calls)
+        (let [storm-cluster-state (:storm-cluster-state nimbus)
+              supervisor-infos (all-supervisor-info storm-cluster-state)
+              host->supervisor-id (Utils/reverseMap (map-val :hostname 
supervisor-infos))
+              supervisor-ids (if (nil? supervisor-id)
+                                (get host->supervisor-id host)
+                                  [supervisor-id])
+              page-info (SupervisorPageInfo.)]
+              (doseq [sid supervisor-ids]
+                (let [supervisor-info (get supervisor-infos sid)
+                      sup-sum (make-supervisor-summary nimbus sid 
supervisor-info)
+                      _ (.add_to_supervisor_summaries page-info sup-sum)
+                      topo-id->assignments (topology-assignments 
storm-cluster-state)
+                      {:keys [user-topologies 
+                              supervisor-topologies]} 
(user-and-supervisor-topos nimbus
+                                                                               
  conf
+                                                                               
  blob-store
+                                                                               
  topo-id->assignments 
+                                                                               
  sid)]
+                  (doseq [storm-id supervisor-topologies]
+                      (let [topo-info (get-common-topo-info storm-id 
"getSupervisorPageInfo")
+                            {:keys [storm-name
+                                    assignment
+                                    beats
+                                    task->component]} topo-info
+                            exec->node+port (:executor->node+port assignment)
+                            node->host (:node->host assignment)
+                            worker->resources 
(get-worker-resources-for-topology nimbus storm-id)]
+                        (doseq [worker-summary (StatsUtil/aggWorkerStats 
storm-id 
+                                                                         
storm-name
+                                                                         
task->component
+                                                                         beats
+                                                                         
exec->node+port
+                                                                         
node->host
+                                                                         
worker->resources
+                                                                         
include-sys?
+                                                                         
(boolean (get user-topologies storm-id))
+                                                                         sid)]
+                          (.add_to_worker_summaries page-info 
worker-summary)))))) 
+              page-info))
+
       (^ComponentPageInfo getComponentPageInfo
         [this
          ^String topo-id

http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj 
b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 831725a..a14b1b8 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -39,7 +39,7 @@
             TopologyStats CommonAggregateStats ComponentAggregateStats
             ComponentType BoltAggregateStats SpoutAggregateStats
             ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo
-            LogConfig LogLevel LogLevelAction])
+            LogConfig LogLevel LogLevelAction SupervisorPageInfo 
WorkerSummary])
   (:import [org.apache.storm.security.auth AuthUtils ReqContext])
   (:import [org.apache.storm.generated AuthorizationException ProfileRequest 
ProfileAction NodeInfo])
   (:import [org.apache.storm.security.auth AuthUtils])
@@ -67,6 +67,7 @@
 (def ui:num-cluster-configuration-http-requests 
(StormMetricsRegistry/registerMeter 
"ui:num-cluster-configuration-http-requests")) 
 (def ui:num-cluster-summary-http-requests (StormMetricsRegistry/registerMeter 
"ui:num-cluster-summary-http-requests")) 
 (def ui:num-nimbus-summary-http-requests (StormMetricsRegistry/registerMeter 
"ui:num-nimbus-summary-http-requests")) 
+(def ui:num-supervisor-http-requests (StormMetricsRegistry/registerMeter 
"ui:num-supervisor-http-requests")) 
 (def ui:num-supervisor-summary-http-requests 
(StormMetricsRegistry/registerMeter "ui:num-supervisor-summary-http-requests")) 
 (def ui:num-all-topologies-summary-http-requests 
(StormMetricsRegistry/registerMeter 
"ui:num-all-topologies-summary-http-requests")) 
 (def ui:num-topology-page-http-requests (StormMetricsRegistry/registerMeter 
"ui:num-topology-page-http-requests"))
@@ -429,26 +430,77 @@
           "nimbusUpTime" (UIHelpers/prettyUptimeSec uptime)
           "nimbusUpTimeSeconds" uptime}))})))
 
+(defn worker-summary-to-json
+  [secure? ^WorkerSummary worker-summary]
+  (let [host (.get_host worker-summary)
+        port (.get_port worker-summary)
+        topology-id (.get_topology_id worker-summary)
+        uptime-secs (.get_uptime_secs worker-summary)]
+    {"supervisorId" (.get_supervisor_id worker-summary)
+     "host" host
+     "port" port
+     "topologyId" topology-id
+     "topologyName" (.get_topology_name worker-summary)
+     "executorsTotal" (.get_num_executors worker-summary)
+     "assignedMemOnHeap" (.get_assigned_memonheap worker-summary)
+     "assignedMemOffHeap" (.get_assigned_memoffheap worker-summary)
+     "assignedCpu" (.get_assigned_cpu worker-summary)
+     "componentNumTasks" (.get_component_to_num_tasks worker-summary)
+     "uptime" (UIHelpers/prettyUptimeSec uptime-secs)
+     "uptimeSeconds" uptime-secs
+     "workerLogLink" (worker-log-link host port topology-id secure?)}))
+
+(defn supervisor-summary-to-json 
+  [summary]
+  (let [slotsTotal (.get_num_workers summary)
+        slotsUsed (.get_num_used_workers summary)
+        slotsFree (max (- slotsTotal slotsUsed) 0)
+        totalMem (get (.get_total_resources summary) 
Config/SUPERVISOR_MEMORY_CAPACITY_MB)
+        totalCpu (get (.get_total_resources summary) 
Config/SUPERVISOR_CPU_CAPACITY)
+        usedMem (.get_used_mem summary)
+        usedCpu (.get_used_cpu summary)
+        availMem (max (- totalMem usedMem) 0)
+        availCpu (max (- totalCpu usedCpu) 0)]
+  {"id" (.get_supervisor_id summary)
+   "host" (.get_host summary)
+   "uptime" (UIHelpers/prettyUptimeSec (.get_uptime_secs summary))
+   "uptimeSeconds" (.get_uptime_secs summary)
+   "slotsTotal" slotsTotal
+   "slotsUsed" slotsUsed
+   "slotsFree" slotsFree
+   "totalMem" totalMem
+   "totalCpu" totalCpu
+   "usedMem" usedMem
+   "usedCpu" usedCpu
+   "logLink" (supervisor-log-link (.get_host summary))
+   "availMem" availMem
+   "availCpu" availCpu
+   "version" (.get_version summary)}))
+
+(defn supervisor-page-info
+  ([supervisor-id host include-sys? secure?]
+     (thrift/with-configured-nimbus-connection 
+        nimbus (supervisor-page-info (.getSupervisorPageInfo ^Nimbus$Client 
nimbus
+                                                      supervisor-id
+                                                      host
+                                                      include-sys?) secure?)))
+  ([^SupervisorPageInfo supervisor-page-info secure?]
+    ;; ask nimbus to return supervisor workers + any details user is allowed
+    ;; access on a per-topology basis (i.e. components)
+    (let [supervisors-json (map supervisor-summary-to-json 
(.get_supervisor_summaries supervisor-page-info))]
+      {"supervisors" supervisors-json
+       "schedulerDisplayResource" (*STORM-CONF* 
Config/SCHEDULER_DISPLAY_RESOURCE)
+       "workers" (into [] (for [^WorkerSummary worker-summary 
(.get_worker_summaries supervisor-page-info)]
+                            (worker-summary-to-json secure? 
worker-summary)))})))
+
 (defn supervisor-summary
   ([]
    (thrift/with-configured-nimbus-connection nimbus
                 (supervisor-summary
                   (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))))
   ([summs]
-   {"supervisors"
-    (for [^SupervisorSummary s summs]
-      {"id" (.get_supervisor_id s)
-       "host" (.get_host s)
-       "uptime" (UIHelpers/prettyUptimeSec (.get_uptime_secs s))
-       "uptimeSeconds" (.get_uptime_secs s)
-       "slotsTotal" (.get_num_workers s)
-       "slotsUsed" (.get_num_used_workers s)
-       "totalMem" (get (.get_total_resources s) 
Config/SUPERVISOR_MEMORY_CAPACITY_MB)
-       "totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY)
-       "usedMem" (.get_used_mem s)
-       "usedCpu" (.get_used_cpu s)
-       "logLink" (supervisor-log-link (.get_host s))
-       "version" (.get_version s)})
+   {"supervisors" (for [^SupervisorSummary s summs]
+                    (supervisor-summary-to-json s))
     "schedulerDisplayResource" (*STORM-CONF* 
Config/SCHEDULER_DISPLAY_RESOURCE)}))
 
 (defn all-topologies-summary
@@ -608,6 +660,8 @@
      "assignedTotalMem" (+ (.get_assigned_memonheap topo-info) 
(.get_assigned_memoffheap topo-info))
      "assignedCpu" (.get_assigned_cpu topo-info)
      "topologyStats" topo-stats
+     "workers"  (map (partial worker-summary-to-json secure?)
+                     (.get_workers topo-info))
      "spouts" (map (partial comp-agg-stats-json id secure?)
                    (.get_id_to_spout_agg_stats topo-info))
      "bolts" (map (partial comp-agg-stats-json id secure?)
@@ -1070,6 +1124,16 @@
     (assert-authorized-user "getClusterInfo")
     (json-response (assoc (supervisor-summary)
                      "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback 
m)))
+  (GET "/api/v1/supervisor" [:as {:keys [cookies servlet-request scheme]} & m]
+    (.mark ui:num-supervisor-http-requests)
+    (populate-context! servlet-request)
+    (assert-authorized-user "getSupervisorPageInfo")
+    ;; supervisor takes either an id or a host query parameter, and 
technically both
+    ;; that said, if both the id and host are provided, the id wins
+    (let [id (:id m)
+          host (:host m)]
+      (json-response (assoc (supervisor-page-info id host (check-include-sys? 
(:sys m)) (= scheme :https))
+                            "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) 
(:callback m))))
   (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
     (.mark ui:num-all-topologies-summary-http-requests)
     (populate-context! servlet-request)

Reply via email to