STORM-1994: Add table with per-topology and worker resource usage and components in (new) supervisor and topology pages
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f5ad9288 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f5ad9288 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f5ad9288 Branch: refs/heads/master Commit: f5ad92888b6f1470e4d7f179fbe5ae73e37a07f7 Parents: 726f3ea Author: Alessandro Bellina <[email protected]> Authored: Wed Jul 6 14:23:18 2016 -0500 Committer: Alessandro Bellina <[email protected]> Committed: Mon Jul 25 23:21:23 2016 -0500 ---------------------------------------------------------------------- docs/STORM-UI-REST-API.md | 121 +- docs/images/supervisor_page.png | Bin 0 -> 133290 bytes .../src/clj/org/apache/storm/daemon/nimbus.clj | 327 +- storm-core/src/clj/org/apache/storm/ui/core.clj | 94 +- .../jvm/org/apache/storm/generated/Nimbus.java | 3486 ++++++++++++------ .../storm/generated/SupervisorPageInfo.java | 624 ++++ .../storm/generated/TopologyPageInfo.java | 284 +- .../apache/storm/generated/WorkerSummary.java | 1880 ++++++++++ .../jvm/org/apache/storm/scheduler/Cluster.java | 217 +- .../resource/ResourceAwareScheduler.java | 9 + .../auth/authorizer/SimpleACLAuthorizer.java | 7 +- .../jvm/org/apache/storm/stats/StatsUtil.java | 132 + storm-core/src/py/storm/Nimbus-remote | 7 + storm-core/src/py/storm/Nimbus.py | 272 +- storm-core/src/py/storm/ttypes.py | 1457 ++++++-- storm-core/src/storm.thrift | 25 + storm-core/src/ui/public/component.html | 8 + storm-core/src/ui/public/css/style.css | 20 + storm-core/src/ui/public/js/script.js | 191 + storm-core/src/ui/public/supervisor.html | 132 + .../public/templates/index-page-template.html | 4 +- .../templates/supervisor-page-template.html | 145 + .../templates/topology-page-template.html | 208 +- storm-core/src/ui/public/topology.html | 12 +- .../test/clj/org/apache/storm/nimbus_test.clj | 79 +- .../org/apache/storm/stats/TestStatsUtil.java | 266 ++ 26 files changed, 8352 insertions(+), 1655 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/docs/STORM-UI-REST-API.md ---------------------------------------------------------------------- diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md index a2fc39a..2a844a5 100644 --- a/docs/STORM-UI-REST-API.md +++ b/docs/STORM-UI-REST-API.md @@ -125,6 +125,7 @@ Response fields: |uptimeSeconds| Integer| Shows how long the supervisor is running in seconds| |slotsTotal| Integer| Total number of available worker slots for this supervisor| |slotsUsed| Integer| Number of worker slots used on this supervisor| +|schedulerDisplayResource| Boolean | Whether to display scheduler resource information| |totalMem| Double| Total memory capacity on this supervisor| |totalCpu| Double| Total CPU capacity on this supervisor| |usedMem| Double| Used memory capacity on this supervisor| @@ -207,6 +208,123 @@ Sample response: } ``` +### /api/v1/supervisor (GET) + +Returns summary for a supervisor by id, or all supervisors running on a host. + +Examples: + +```no-highlight + 1. By host: http://ui-daemon-host-name:8080/api/v1/supervisor?host=supervisor-daemon-host-name + 2. By id: http://ui-daemon-host-name:8080/api/v1/supervisor?id=f5449110-1daa-43e2-89e3-69917b16dec9-192.168.1.1 +``` + +Request parameters: + +|Parameter |Value |Description | +|----------|--------|-------------| +|id |String. Supervisor id | If specified, respond with the supervisor and worker stats with id. Note that when id is specified, the host argument is ignored. | +|host |String. Host name| If specified, respond with all supervisors and worker stats in the host (normally just one)| +|sys |String. Values 1 or 0. Default value 0| Controls including sys stats part of the response| + +Response fields: + +|Field |Value|Description| +|--- |--- |--- +|supervisors| Array| Array of supervisor summaries| +|workers| Array| Array of worker summaries | +|schedulerDisplayResource| Boolean | Whether to display scheduler resource information| + +Each supervisor is defined by: + +|Field |Value|Description| +|--- |--- |--- +|id| String | Supervisor's id| +|host| String| Supervisor's host name| +|uptime| String| Shows how long the supervisor is running| +|uptimeSeconds| Integer| Shows how long the supervisor is running in seconds| +|slotsTotal| Integer| Total number of worker slots for this supervisor| +|slotsUsed| Integer| Number of worker slots used on this supervisor| +|totalMem| Double| Total memory capacity on this supervisor| +|totalCpu| Double| Total CPU capacity on this supervisor| +|usedMem| Double| Used memory capacity on this supervisor| +|usedCpu| Double| Used CPU capacity on this supervisor| + +Each worker is defined by: + +|Field |Value |Description| +|-------|-------|-----------| +|supervisorId | String| Supervisor's id| +|host | String | Worker's host name| +|port | Integer | Worker's port| +|topologyId | String | Topology Id| +|topologyName | String | Topology Name| +|executorsTotal | Integer | Number of executors used by the topology in this worker| +|assignedMemOnHeap | Double | Assigned On-Heap Memory by Scheduler (MB)| +|assignedMemOffHeap | Double | Assigned Off-Heap Memory by Scheduler (MB)| +|assignedCpu | Number | Assigned CPU by Scheduler (%)| +|componentNumTasks | Dictionary | Components -> # of executing tasks| +|uptime| String| Shows how long the worker is running| +|uptimeSeconds| Integer| Shows how long the worker is running in seconds| +|workerLogLink | String | Link to worker log viewer page| + +Sample response: + +```json +{ + "supervisors": [{ + "totalMem": 4096.0, + "host":"192.168.10.237", + "id":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212", + "uptime":"7m 8s", + "totalCpu":400.0, + "usedCpu":495.0, + "usedMem":3432.0, + "slotsUsed":2, + "version":"0.10.1", + "slotsTotal":4, + "uptimeSeconds":428 + }], + "schedulerDisplayResource":true, + "workers":[{ + "topologyName":"ras", + "topologyId":"ras-4-1460229987", + "host":"192.168.10.237", + "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212", + "assignedMemOnHeap":704.0, + "uptime":"2m 47s", + "uptimeSeconds":167, + "port":6707, + "workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6707%2Fworker.log", + "componentNumTasks": { + "word":5 + }, + "executorsTotal":8, + "assignedCpu":130.0, + "assignedMemOffHeap":80.0 + }, + { + "topologyName":"ras", + "topologyId":"ras-4-1460229987", + "host":"192.168.10.237", + "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212", + "assignedMemOnHeap":904.0, + "uptime":"2m 53s", + "port":6706, + "workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6706%2Fworker.log", + "componentNumTasks":{ + "exclaim2":2, + "exclaim1":3, + "word":5 + }, + "executorsTotal":10, + "uptimeSeconds":173, + "assignedCpu":165.0, + "assignedMemOffHeap":80.0 + }] +} +``` + ### /api/v1/topology/summary (GET) Returns summary information for all topologies. @@ -232,6 +350,7 @@ Response fields: |assignedMemOffHeap| Double|Assigned Off-Heap Memory by Scheduler (MB)| |assignedTotalMem| Double|Assigned Total Memory by Scheduler (MB)| |assignedCpu| Double|Assigned CPU by Scheduler (%)| +|schedulerDisplayResource| Boolean | Whether to display scheduler resource information| Sample response: @@ -257,7 +376,7 @@ Sample response: "assignedTotalMem": 768, "assignedCpu": 80 } - ] + ], "schedulerDisplayResource": true } ``` http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/docs/images/supervisor_page.png ---------------------------------------------------------------------- diff --git a/docs/images/supervisor_page.png b/docs/images/supervisor_page.png new file mode 100644 index 0000000..5133681 Binary files /dev/null and b/docs/images/supervisor_page.png differ http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 93507ff..34cb7e0 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -48,7 +48,7 @@ KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction - ProfileRequest ProfileAction NodeInfo LSTopoHistory]) + ProfileRequest ProfileAction NodeInfo LSTopoHistory SupervisorPageInfo WorkerSummary WorkerResources]) (:import [org.apache.storm.daemon Shutdownable StormCommon DaemonCommon]) (:import [org.apache.storm.validation ConfigValidation]) (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils]) @@ -98,6 +98,7 @@ (def nimbus:num-getTopologyInfoWithOpts-calls (StormMetricsRegistry/registerMeter "nimbus:num-getTopologyInfoWithOpts-calls")) (def nimbus:num-getTopologyInfo-calls (StormMetricsRegistry/registerMeter "nimbus:num-getTopologyInfo-calls")) (def nimbus:num-getTopologyPageInfo-calls (StormMetricsRegistry/registerMeter "nimbus:num-getTopologyPageInfo-calls")) +(def nimbus:num-getSupervisorPageInfo-calls (StormMetricsRegistry/registerMeter "nimbus:num-getSupervisorPageInfo-calls")) (def nimbus:num-getComponentPageInfo-calls (StormMetricsRegistry/registerMeter "nimbus:num-getComponentPageInfo-calls")) (def nimbus:num-shutdown-calls (StormMetricsRegistry/registerMeter "nimbus:num-shutdown-calls")) @@ -216,6 +217,7 @@ :id->sched-status (atom {}) :node-id->resources (atom {}) ;;resources of supervisors :id->resources (atom {}) ;;resources of topologies + :id->worker-resources (atom {}) ; resources of workers per topology :cred-renewers (AuthUtils/GetCredentialRenewers conf) :topology-history-lock (Object.) :topo-history-state (ConfigUtils/nimbusTopoHistoryState conf) @@ -433,7 +435,8 @@ {}) )) -(defn- all-supervisor-info +;; public for testing +(defn all-supervisor-info ([storm-cluster-state] (all-supervisor-info storm-cluster-state nil)) ([storm-cluster-state callback] (let [supervisor-ids (.supervisors storm-cluster-state callback)] @@ -441,10 +444,8 @@ (mapcat (fn [id] (if-let [info (clojurify-supervisor-info (.supervisorInfo storm-cluster-state id))] - [[id info]] - )) - supervisor-ids)) - ))) + [[id info]])) + supervisor-ids))))) (defn- all-scheduling-slots [nimbus topologies missing-assignment-topologies] @@ -623,6 +624,10 @@ (defn- to-executor-id [task-ids] [(first task-ids) (last task-ids)]) +;; convenience method for unit test +(defn get-clojurified-task-info [topology storm-conf] + (clojurify-structure (StormCommon/stormTaskInfo topology storm-conf))) + ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE (defn- compute-executors [nimbus storm-id] (let [conf (:conf nimbus) @@ -631,7 +636,7 @@ component->executors (:component->executors storm-base) storm-conf (read-storm-conf-as-nimbus storm-id blob-store) topology (read-storm-topology-as-nimbus storm-id blob-store) - task->component (clojurify-structure(StormCommon/stormTaskInfo topology storm-conf))] + task->component (get-clojurified-task-info topology storm-conf)] (->> (StormCommon/stormTaskInfo topology storm-conf) (Utils/reverseMap) clojurify-structure @@ -649,7 +654,7 @@ executors (compute-executors nimbus storm-id) topology (read-storm-topology-as-nimbus storm-id blob-store) storm-conf (read-storm-conf-as-nimbus storm-id blob-store) - task->component (clojurify-structure (StormCommon/stormTaskInfo topology storm-conf)) + task->component (get-clojurified-task-info topology storm-conf) executor->component (into {} (for [executor executors :let [start-task (first executor) component (task->component start-task)]] @@ -719,8 +724,7 @@ all-ports (-> (get all-scheduling-slots sid) (set/difference dead-ports) ((fn [ports] (map int ports)))) - supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map supervisor-info)) - ]] + supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map supervisor-info))]] {sid supervisor-details}))] (merge all-supervisor-details (into {} @@ -801,6 +805,9 @@ new-topology->executor->node+port)) +(defrecord TopologyResources [requested-mem-on-heap requested-mem-off-heap requested-cpu + assigned-mem-on-heap assigned-mem-off-heap assigned-cpu]) + ;; public so it can be mocked out (defn compute-new-scheduler-assignments [nimbus existing-assignments topologies scratch-topology-id] (let [conf (:conf nimbus) @@ -838,17 +845,33 @@ (apply merge-with set/union)) supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports) - cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf) - _ (.setStatusMap cluster (deref (:id->sched-status nimbus))) - ;; call scheduler.schedule to schedule all the topologies - ;; the new assignments for all the topologies are in the cluster object. - _ (.schedule (:scheduler nimbus) topologies cluster) - _ (.setTopologyResourcesMap cluster @(:id->resources nimbus)) - _ (if-not (conf SCHEDULER-DISPLAY-RESOURCE) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies)) - ;;merge with existing statuses - _ (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster))) - _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster)) - _ (reset! (:id->resources nimbus) (.getTopologyResourcesMap cluster))] + cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf)] + + ;; set the status map with existing topology statuses + (.setStatusMap cluster (deref (:id->sched-status nimbus))) + ;; call scheduler.schedule to schedule all the topologies + ;; the new assignments for all the topologies are in the cluster object. + (.schedule (:scheduler nimbus) topologies cluster) + + ;;merge with existing statuses + (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster))) + (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster)) + + (if-not (conf SCHEDULER-DISPLAY-RESOURCE) + (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies)) + + ; Remove both of swaps below at first opportunity. This is a hack for non-ras scheduler topology and worker resources. + (swap! (:id->resources nimbus) merge (into {} (map (fn [[k v]] [k (->TopologyResources (nth v 0) (nth v 1) (nth v 2) + (nth v 3) (nth v 4) (nth v 5))]) + (.getTopologyResourcesMap cluster)))) + ; Remove this also at first chance + (swap! (:id->worker-resources nimbus) merge + (into {} (map (fn [[k v]] [k (map-val #(doto (WorkerResources.) + (.set_mem_on_heap (nth % 0)) + (.set_mem_off_heap (nth % 1)) + (.set_cpu (nth % 2))) v)]) + (.getWorkerResourcesMap cluster)))) + (.getAssignments cluster))) (defn- map-diff @@ -856,6 +879,48 @@ [m1 m2] (into {} (filter (fn [[k v]] (not= v (m1 k))) m2))) +(defn get-resources-for-topology [nimbus topo-id] + (or (get @(:id->resources nimbus) topo-id) + (try + (let [storm-cluster-state (:storm-cluster-state nimbus) + topology-details (read-topology-details nimbus topo-id) + assigned-resources (->> (clojurify-assignment (.assignmentInfo storm-cluster-state topo-id nil)) + :worker->resources + (vals) + ; Default to [[0 0 0]] if there are no values + (#(or % [[0 0 0]])) + ; [[on-heap, off-heap, cpu]] -> [[on-heap], [off-heap], [cpu]] + (apply map vector) + ; [[on-heap], [off-heap], [cpu]] -> [on-heap-sum, off-heap-sum, cpu-sum] + (map (partial reduce +))) + worker-resources (->TopologyResources (.getTotalRequestedMemOnHeap topology-details) + (.getTotalRequestedMemOffHeap topology-details) + (.getTotalRequestedCpu topology-details) + (nth assigned-resources 0) + (nth assigned-resources 1) + (nth assigned-resources 2))] + (swap! (:id->resources nimbus) assoc topo-id worker-resources) + worker-resources) + (catch KeyNotFoundException e + ; This can happen when a topology is first coming up. + ; It's thrown by the blobstore code. + (log-error e "Failed to get topology details") + (->TopologyResources 0 0 0 0 0 0))))) + +(defn- get-worker-resources-for-topology [nimbus topo-id] + (or (get @(:id->worker-resources nimbus) topo-id) + (try + (let [storm-cluster-state (:storm-cluster-state nimbus) + assigned-resources (->> (clojurify-assignment (.assignmentInfo storm-cluster-state topo-id nil)) + :worker->resources) + worker-resources (into {} (map #(identity {(WorkerSlot. (first (key %)) (second (key %))) + (doto (WorkerResources.) + (.set_mem_on_heap (nth (val %) 0)) + (.set_mem_off_heap (nth (val %) 1)) + (.set_cpu (nth (val %) 2)))}) assigned-resources))] + (swap! (:id->worker-resources nimbus) assoc topo-id worker-resources) + worker-resources)))) + (defn changed-executors [executor->node+port new-executor->node+port] (let [executor->node+port (if executor->node+port (sort executor->node+port) nil) new-executor->node+port (if new-executor->node+port (sort new-executor->node+port) nil) @@ -947,6 +1012,10 @@ start-times worker->resources)}))] + (when (not= new-assignments existing-assignments) + (log-debug "RESETTING id->resources and id->worker-resources cache!") + (reset! (:id->resources nimbus) {}) + (reset! (:id->worker-resources nimbus) {})) ;; tasks figure out what tasks to talk to by looking at topology at runtime ;; only log/set when there's been a change to the assignment (doseq [[topology-id assignment] new-assignments @@ -1015,6 +1084,18 @@ (throw (AlreadyAliveException. (str storm-name " is already active")))) )) +(defn try-read-storm-conf [conf storm-id blob-store] + (try-cause + (read-storm-conf-as-nimbus conf storm-id blob-store) + (catch KeyNotFoundException e + (throw (NotAliveException. (str storm-id)))))) + +(defn try-read-storm-conf-from-name [conf storm-name nimbus] + (let [storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) + id (StormCommon/getStormId storm-cluster-state storm-name)] + (try-read-storm-conf conf id blob-store))) + (defn check-authorization! ([nimbus storm-name storm-conf operation context] (let [aclHandler (:authorization-handler nimbus) @@ -1040,6 +1121,15 @@ ([nimbus storm-name storm-conf operation] (check-authorization! nimbus storm-name storm-conf operation (ReqContext/context)))) +;; no-throw version of check-authorization! +(defn is-authorized? + [nimbus conf blob-store operation topology-id] + (let [topology-conf (try-read-storm-conf conf topology-id blob-store) + storm-name (topology-conf TOPOLOGY-NAME)] + (try (check-authorization! nimbus storm-name topology-conf operation) + true + (catch AuthorizationException e false)))) + (defn code-ids [blob-store] (let [to-id (reify KeyFilter (filter [this key] (ConfigUtils/getIdFromBlobKey key)))] @@ -1352,25 +1442,56 @@ (defmethod blob-sync :local [conf nimbus] nil) +(defn make-supervisor-summary + [nimbus id info] + (let [ports (set (:meta info)) ;;TODO: this is only true for standalone + sup-sum (SupervisorSummary. (:hostname info) + (:uptime-secs info) + (count ports) + (count (:used-ports info)) + id)] + (.set_total_resources sup-sum (map-val double (:resources-map info))) + (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)] + (.set_used_mem sup-sum (Utils/nullToZero used-mem)) + (.set_used_cpu sup-sum (Utils/nullToZero used-cpu))) + (when-let [version (:version info)] (.set_version sup-sum version)) + sup-sum)) + +(defn user-and-supervisor-topos + [nimbus conf blob-store assignments supervisor-id] + (let [topo-id->supervisors + (into {} (for [[topo-id java-assignment] assignments] + (let [assignment (clojurify-assignment java-assignment)] + {topo-id (into #{} + (map #(first (second %)) + (:executor->node+port assignment)))}))) + supervisor-topologies (keys (filter #(get (val %) supervisor-id) topo-id->supervisors))] + {:supervisor-topologies supervisor-topologies + :user-topologies (into #{} (filter (partial is-authorized? nimbus + conf + blob-store + "getTopology") + supervisor-topologies))})) + +(defn topology-assignments + [storm-cluster-state] + (let [assigned-topology-ids (.assignments storm-cluster-state nil)] + (into {} (for [tid assigned-topology-ids] + {tid (.assignmentInfo storm-cluster-state tid nil)})))) + +(defn get-launch-time-secs + [base storm-id] + (if base (:launch-time-secs base) + (throw + (NotAliveException. (str storm-id))))) + (defn get-cluster-info [nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) supervisor-infos (all-supervisor-info storm-cluster-state) ;; TODO: need to get the port info about supervisors... ;; in standalone just look at metadata, otherwise just say N/A? supervisor-summaries (dofor [[id info] supervisor-infos] - (let [ports (set (:meta info)) ;;TODO: this is only true for standalone - sup-sum (SupervisorSummary. (:hostname info) - (:uptime-secs info) - (count ports) - (count (:used-ports info)) - id) ] - ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE - (.set_total_resources sup-sum (map-val double (:resources-map info))) - (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)] - (.set_used_mem sup-sum used-mem) - (.set_used_cpu sup-sum used-cpu)) - (when-let [version (:version info)] (.set_version sup-sum version)) - sup-sum)) + (make-supervisor-summary nimbus id info)) nimbus-uptime (. (:uptime nimbus) upTime) bases (nimbus-topology-bases storm-cluster-state) nimbuses (.nimbuses storm-cluster-state) @@ -1402,13 +1523,13 @@ (extract-status-str base))] (when-let [owner (:owner base)] (.set_owner topo-summ owner)) (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status)) - (when-let [resources (.get @(:id->resources nimbus) id)] - (.set_requested_memonheap topo-summ (get resources 0)) - (.set_requested_memoffheap topo-summ (get resources 1)) - (.set_requested_cpu topo-summ (get resources 2)) - (.set_assigned_memonheap topo-summ (get resources 3)) - (.set_assigned_memoffheap topo-summ (get resources 4)) - (.set_assigned_cpu topo-summ (get resources 5))) + (when-let [resources (get-resources-for-topology nimbus id)] + (.set_requested_memonheap topo-summ (:requested-mem-on-heap resources)) + (.set_requested_memoffheap topo-summ (:requested-mem-off-heap resources)) + (.set_requested_cpu topo-summ (:requested-cpu resources)) + (.set_assigned_memonheap topo-summ (:assigned-mem-on-heap resources)) + (.set_assigned_memoffheap topo-summ (:assigned-mem-off-heap resources)) + (.set_assigned_cpu topo-summ (:assigned-cpu resources))) (.set_replication_count topo-summ (get-blob-replication-count (ConfigUtils/masterStormCodeKey id) nimbus)) topo-summ)) ret (ClusterSummary. supervisor-summaries @@ -1470,11 +1591,9 @@ topology-conf operation) topology (try-read-storm-topology storm-id blob-store) - task->component (clojurify-structure (StormCommon/stormTaskInfo topology topology-conf)) + task->component (get-clojurified-task-info topology topology-conf) base (clojurify-storm-base (.stormBase storm-cluster-state storm-id nil)) - launch-time-secs (if base (:launch-time-secs base) - (throw - (NotAliveException. (str storm-id)))) + launch-time-secs (get-launch-time-secs base storm-id) assignment (clojurify-assignment (.assignmentInfo storm-cluster-state storm-id nil)) beats (get @(:heartbeats-cache nimbus) storm-id) all-components (set (vals task->component))] @@ -1883,13 +2002,13 @@ )] (when-let [owner (:owner base)] (.set_owner topo-info owner)) (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status)) - (when-let [resources (.get @(:id->resources nimbus) storm-id)] - (.set_requested_memonheap topo-info (get resources 0)) - (.set_requested_memoffheap topo-info (get resources 1)) - (.set_requested_cpu topo-info (get resources 2)) - (.set_assigned_memonheap topo-info (get resources 3)) - (.set_assigned_memoffheap topo-info (get resources 4)) - (.set_assigned_cpu topo-info (get resources 5))) + (when-let [resources (get-resources-for-topology nimbus storm-id)] + (.set_requested_memonheap topo-info (:requested-mem-on-heap resources)) + (.set_requested_memoffheap topo-info (:requested-mem-off-heap resources)) + (.set_requested_cpu topo-info (:requested-cpu resources)) + (.set_assigned_memonheap topo-info (:assigned-mem-on-heap resources)) + (.set_assigned_memoffheap topo-info (:assigned-mem-off-heap resources)) + (.set_assigned_cpu topo-info (:assigned-cpu resources))) (when-let [component->debug (:component->debug base)] ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug))) @@ -1902,6 +2021,7 @@ topology-id (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL)))) + (^String beginCreateBlob [this ^String blob-key ^SettableBlobMeta blob-meta] @@ -2053,44 +2173,109 @@ (^TopologyPageInfo getTopologyPageInfo [this ^String topo-id ^String window ^boolean include-sys?] (.mark nimbus:num-getTopologyPageInfo-calls) - (let [info (get-common-topo-info topo-id "getTopologyPageInfo") - - exec->node+port (:executor->node+port (:assignment info)) + (let [topo-info (get-common-topo-info topo-id "getTopologyPageInfo") + {:keys [storm-name + storm-cluster-state + launch-time-secs + assignment + beats + task->component + topology + base]} topo-info + exec->node+port (:executor->node+port assignment) + node->host (:node->host assignment) + worker->resources (get-worker-resources-for-topology nimbus topo-id) + worker-summaries (StatsUtil/aggWorkerStats topo-id + storm-name + task->component + beats + exec->node+port + node->host + worker->resources + include-sys? + true) ;; this is the topology page, so we know the user is authorized topo-page-info (StatsUtil/aggTopoExecsStats topo-id exec->node+port - (:task->component info) - (:beats info) - (:topology info) + task->component + beats + topology window include-sys? - (:storm-cluster-state info))] - (when-let [owner (:owner (:base info))] + storm-cluster-state)] + (.set_workers topo-page-info worker-summaries) + (when-let [owner (:owner base)] (.set_owner topo-page-info owner)) (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)] (.set_sched_status topo-page-info sched-status)) - (when-let [resources (.get @(:id->resources nimbus) topo-id)] - (.set_requested_memonheap topo-page-info (get resources 0)) - (.set_requested_memoffheap topo-page-info (get resources 1)) - (.set_requested_cpu topo-page-info (get resources 2)) - (.set_assigned_memonheap topo-page-info (get resources 3)) - (.set_assigned_memoffheap topo-page-info (get resources 4)) - (.set_assigned_cpu topo-page-info (get resources 5))) + (when-let [resources (get-resources-for-topology nimbus topo-id)] + (.set_requested_memonheap topo-page-info (:requested-mem-on-heap resources)) + (.set_requested_memoffheap topo-page-info (:requested-mem-off-heap resources)) + (.set_requested_cpu topo-page-info (:requested-cpu resources)) + (.set_assigned_memonheap topo-page-info (:assigned-mem-on-heap resources)) + (.set_assigned_memoffheap topo-page-info (:assigned-mem-off-heap resources)) + (.set_assigned_cpu topo-page-info (:assigned-cpu resources))) (doto topo-page-info - (.set_name (:storm-name info)) - (.set_status (extract-status-str (:base info))) - (.set_uptime_secs (Time/deltaSecs (:launch-time-secs info))) + (.set_name storm-name) + (.set_status (extract-status-str base)) + (.set_uptime_secs (Time/deltaSecs launch-time-secs)) (.set_topology_conf (JSONValue/toJSONString (try-read-storm-conf conf topo-id (:blob-store nimbus)))) (.set_replication_count (get-blob-replication-count (ConfigUtils/masterStormCodeKey topo-id) nimbus))) (when-let [debug-options - (get-in info [:base :component->debug topo-id])] + (get-in topo-info [:base :component->debug topo-id])] (.set_debug_options topo-page-info (converter/thriftify-debugoptions debug-options))) topo-page-info)) + (^SupervisorPageInfo getSupervisorPageInfo + [this + ^String supervisor-id + ^String host + ^boolean include-sys?] + (.mark nimbus:num-getSupervisorPageInfo-calls) + (let [storm-cluster-state (:storm-cluster-state nimbus) + supervisor-infos (all-supervisor-info storm-cluster-state) + host->supervisor-id (Utils/reverseMap (map-val :hostname supervisor-infos)) + supervisor-ids (if (nil? supervisor-id) + (get host->supervisor-id host) + [supervisor-id]) + page-info (SupervisorPageInfo.)] + (doseq [sid supervisor-ids] + (let [supervisor-info (get supervisor-infos sid) + sup-sum (make-supervisor-summary nimbus sid supervisor-info) + _ (.add_to_supervisor_summaries page-info sup-sum) + topo-id->assignments (topology-assignments storm-cluster-state) + {:keys [user-topologies + supervisor-topologies]} (user-and-supervisor-topos nimbus + conf + blob-store + topo-id->assignments + sid)] + (doseq [storm-id supervisor-topologies] + (let [topo-info (get-common-topo-info storm-id "getSupervisorPageInfo") + {:keys [storm-name + assignment + beats + task->component]} topo-info + exec->node+port (:executor->node+port assignment) + node->host (:node->host assignment) + worker->resources (get-worker-resources-for-topology nimbus storm-id)] + (doseq [worker-summary (StatsUtil/aggWorkerStats storm-id + storm-name + task->component + beats + exec->node+port + node->host + worker->resources + include-sys? + (boolean (get user-topologies storm-id)) + sid)] + (.add_to_worker_summaries page-info worker-summary)))))) + page-info)) + (^ComponentPageInfo getComponentPageInfo [this ^String topo-id http://git-wip-us.apache.org/repos/asf/storm/blob/f5ad9288/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index 831725a..a14b1b8 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -39,7 +39,7 @@ TopologyStats CommonAggregateStats ComponentAggregateStats ComponentType BoltAggregateStats SpoutAggregateStats ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo - LogConfig LogLevel LogLevelAction]) + LogConfig LogLevel LogLevelAction SupervisorPageInfo WorkerSummary]) (:import [org.apache.storm.security.auth AuthUtils ReqContext]) (:import [org.apache.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo]) (:import [org.apache.storm.security.auth AuthUtils]) @@ -67,6 +67,7 @@ (def ui:num-cluster-configuration-http-requests (StormMetricsRegistry/registerMeter "ui:num-cluster-configuration-http-requests")) (def ui:num-cluster-summary-http-requests (StormMetricsRegistry/registerMeter "ui:num-cluster-summary-http-requests")) (def ui:num-nimbus-summary-http-requests (StormMetricsRegistry/registerMeter "ui:num-nimbus-summary-http-requests")) +(def ui:num-supervisor-http-requests (StormMetricsRegistry/registerMeter "ui:num-supervisor-http-requests")) (def ui:num-supervisor-summary-http-requests (StormMetricsRegistry/registerMeter "ui:num-supervisor-summary-http-requests")) (def ui:num-all-topologies-summary-http-requests (StormMetricsRegistry/registerMeter "ui:num-all-topologies-summary-http-requests")) (def ui:num-topology-page-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-page-http-requests")) @@ -429,26 +430,77 @@ "nimbusUpTime" (UIHelpers/prettyUptimeSec uptime) "nimbusUpTimeSeconds" uptime}))}))) +(defn worker-summary-to-json + [secure? ^WorkerSummary worker-summary] + (let [host (.get_host worker-summary) + port (.get_port worker-summary) + topology-id (.get_topology_id worker-summary) + uptime-secs (.get_uptime_secs worker-summary)] + {"supervisorId" (.get_supervisor_id worker-summary) + "host" host + "port" port + "topologyId" topology-id + "topologyName" (.get_topology_name worker-summary) + "executorsTotal" (.get_num_executors worker-summary) + "assignedMemOnHeap" (.get_assigned_memonheap worker-summary) + "assignedMemOffHeap" (.get_assigned_memoffheap worker-summary) + "assignedCpu" (.get_assigned_cpu worker-summary) + "componentNumTasks" (.get_component_to_num_tasks worker-summary) + "uptime" (UIHelpers/prettyUptimeSec uptime-secs) + "uptimeSeconds" uptime-secs + "workerLogLink" (worker-log-link host port topology-id secure?)})) + +(defn supervisor-summary-to-json + [summary] + (let [slotsTotal (.get_num_workers summary) + slotsUsed (.get_num_used_workers summary) + slotsFree (max (- slotsTotal slotsUsed) 0) + totalMem (get (.get_total_resources summary) Config/SUPERVISOR_MEMORY_CAPACITY_MB) + totalCpu (get (.get_total_resources summary) Config/SUPERVISOR_CPU_CAPACITY) + usedMem (.get_used_mem summary) + usedCpu (.get_used_cpu summary) + availMem (max (- totalMem usedMem) 0) + availCpu (max (- totalCpu usedCpu) 0)] + {"id" (.get_supervisor_id summary) + "host" (.get_host summary) + "uptime" (UIHelpers/prettyUptimeSec (.get_uptime_secs summary)) + "uptimeSeconds" (.get_uptime_secs summary) + "slotsTotal" slotsTotal + "slotsUsed" slotsUsed + "slotsFree" slotsFree + "totalMem" totalMem + "totalCpu" totalCpu + "usedMem" usedMem + "usedCpu" usedCpu + "logLink" (supervisor-log-link (.get_host summary)) + "availMem" availMem + "availCpu" availCpu + "version" (.get_version summary)})) + +(defn supervisor-page-info + ([supervisor-id host include-sys? secure?] + (thrift/with-configured-nimbus-connection + nimbus (supervisor-page-info (.getSupervisorPageInfo ^Nimbus$Client nimbus + supervisor-id + host + include-sys?) secure?))) + ([^SupervisorPageInfo supervisor-page-info secure?] + ;; ask nimbus to return supervisor workers + any details user is allowed + ;; access on a per-topology basis (i.e. components) + (let [supervisors-json (map supervisor-summary-to-json (.get_supervisor_summaries supervisor-page-info))] + {"supervisors" supervisors-json + "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE) + "workers" (into [] (for [^WorkerSummary worker-summary (.get_worker_summaries supervisor-page-info)] + (worker-summary-to-json secure? worker-summary)))}))) + (defn supervisor-summary ([] (thrift/with-configured-nimbus-connection nimbus (supervisor-summary (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus))))) ([summs] - {"supervisors" - (for [^SupervisorSummary s summs] - {"id" (.get_supervisor_id s) - "host" (.get_host s) - "uptime" (UIHelpers/prettyUptimeSec (.get_uptime_secs s)) - "uptimeSeconds" (.get_uptime_secs s) - "slotsTotal" (.get_num_workers s) - "slotsUsed" (.get_num_used_workers s) - "totalMem" (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB) - "totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY) - "usedMem" (.get_used_mem s) - "usedCpu" (.get_used_cpu s) - "logLink" (supervisor-log-link (.get_host s)) - "version" (.get_version s)}) + {"supervisors" (for [^SupervisorSummary s summs] + (supervisor-summary-to-json s)) "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)})) (defn all-topologies-summary @@ -608,6 +660,8 @@ "assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info)) "assignedCpu" (.get_assigned_cpu topo-info) "topologyStats" topo-stats + "workers" (map (partial worker-summary-to-json secure?) + (.get_workers topo-info)) "spouts" (map (partial comp-agg-stats-json id secure?) (.get_id_to_spout_agg_stats topo-info)) "bolts" (map (partial comp-agg-stats-json id secure?) @@ -1070,6 +1124,16 @@ (assert-authorized-user "getClusterInfo") (json-response (assoc (supervisor-summary) "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m))) + (GET "/api/v1/supervisor" [:as {:keys [cookies servlet-request scheme]} & m] + (.mark ui:num-supervisor-http-requests) + (populate-context! servlet-request) + (assert-authorized-user "getSupervisorPageInfo") + ;; supervisor takes either an id or a host query parameter, and technically both + ;; that said, if both the id and host are provided, the id wins + (let [id (:id m) + host (:host m)] + (json-response (assoc (supervisor-page-info id host (check-include-sys? (:sys m)) (= scheme :https)) + "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m)))) (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m] (.mark ui:num-all-topologies-summary-http-requests) (populate-context! servlet-request)
