STORM-1994: Add table with per-topology and worker resource usage and components in (new) supervisor and topology pages
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e0bcf27 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e0bcf27 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e0bcf27 Branch: refs/heads/1.x-branch Commit: 0e0bcf27f4b7787cc3e6886ccbcd5dc55daef771 Parents: ce38849 Author: Alessandro Bellina <[email protected]> Authored: Wed Jul 6 14:23:18 2016 -0500 Committer: Alessandro Bellina <[email protected]> Committed: Sun Aug 21 22:31:08 2016 -0500 ---------------------------------------------------------------------- docs/STORM-UI-REST-API.md | 121 +- docs/images/supervisor_page.png | Bin 0 -> 133290 bytes .../src/clj/org/apache/storm/daemon/nimbus.clj | 296 +- storm-core/src/clj/org/apache/storm/stats.clj | 68 +- storm-core/src/clj/org/apache/storm/ui/core.clj | 94 +- .../org/apache/storm/generated/Assignment.java | 244 +- .../storm/generated/ClusterWorkerHeartbeat.java | 52 +- .../storm/generated/ComponentPageInfo.java | 220 +- .../org/apache/storm/generated/Credentials.java | 44 +- .../jvm/org/apache/storm/generated/HBNodes.java | 32 +- .../org/apache/storm/generated/HBRecords.java | 36 +- .../storm/generated/LSApprovedWorkers.java | 44 +- .../generated/LSSupervisorAssignments.java | 48 +- .../apache/storm/generated/LSTopoHistory.java | 64 +- .../storm/generated/LSTopoHistoryList.java | 36 +- .../storm/generated/LSWorkerHeartbeat.java | 36 +- .../apache/storm/generated/ListBlobsResult.java | 32 +- .../apache/storm/generated/LocalAssignment.java | 36 +- .../apache/storm/generated/LocalStateData.java | 48 +- .../org/apache/storm/generated/LogConfig.java | 48 +- .../jvm/org/apache/storm/generated/Nimbus.java | 3486 ++++++++++++------ .../org/apache/storm/generated/NodeInfo.java | 32 +- .../storm/generated/RebalanceOptions.java | 44 +- .../storm/generated/SettableBlobMeta.java | 36 +- .../org/apache/storm/generated/StormBase.java | 92 +- .../apache/storm/generated/SupervisorInfo.java | 152 +- .../storm/generated/SupervisorPageInfo.java | 624 ++++ .../storm/generated/TopologyHistoryInfo.java | 32 +- .../storm/generated/TopologyPageInfo.java | 284 +- .../apache/storm/generated/WorkerSummary.java | 1880 ++++++++++ .../jvm/org/apache/storm/scheduler/Cluster.java | 217 +- .../resource/ResourceAwareScheduler.java | 9 + .../auth/authorizer/SimpleACLAuthorizer.java | 7 +- storm-core/src/py/storm/Nimbus-remote | 7 + storm-core/src/py/storm/Nimbus.py | 272 +- storm-core/src/py/storm/ttypes.py | 1457 ++++++-- storm-core/src/storm.thrift | 25 + storm-core/src/ui/public/component.html | 8 + storm-core/src/ui/public/css/style.css | 20 + storm-core/src/ui/public/js/script.js | 191 + storm-core/src/ui/public/supervisor.html | 132 + .../public/templates/index-page-template.html | 4 +- .../templates/supervisor-page-template.html | 145 + .../templates/topology-page-template.html | 208 +- storm-core/src/ui/public/topology.html | 12 +- .../test/clj/org/apache/storm/nimbus_test.clj | 72 +- .../test/clj/org/apache/storm/stats_test.clj | 134 + 47 files changed, 8820 insertions(+), 2361 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/docs/STORM-UI-REST-API.md ---------------------------------------------------------------------- diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md index 884c6d5..340137d 100644 --- a/docs/STORM-UI-REST-API.md +++ b/docs/STORM-UI-REST-API.md @@ -125,6 +125,7 @@ Response fields: |uptimeSeconds| Integer| Shows how long the supervisor is running in seconds| |slotsTotal| Integer| Total number of available worker slots for this supervisor| |slotsUsed| Integer| Number of worker slots used on this supervisor| +|schedulerDisplayResource| Boolean | Whether to display scheduler resource information| |totalMem| Double| Total memory capacity on this supervisor| |totalCpu| Double| Total CPU capacity on this supervisor| |usedMem| Double| Used memory capacity on this supervisor| @@ -207,6 +208,123 @@ Sample response: } ``` +### /api/v1/supervisor (GET) + +Returns summary for a supervisor by id, or all supervisors running on a host. + +Examples: + +```no-highlight + 1. By host: http://ui-daemon-host-name:8080/api/v1/supervisor?host=supervisor-daemon-host-name + 2. By id: http://ui-daemon-host-name:8080/api/v1/supervisor?id=f5449110-1daa-43e2-89e3-69917b16dec9-192.168.1.1 +``` + +Request parameters: + +|Parameter |Value |Description | +|----------|--------|-------------| +|id |String. Supervisor id | If specified, respond with the supervisor and worker stats with id. Note that when id is specified, the host argument is ignored. | +|host |String. Host name| If specified, respond with all supervisors and worker stats in the host (normally just one)| +|sys |String. Values 1 or 0. Default value 0| Controls including sys stats part of the response| + +Response fields: + +|Field |Value|Description| +|--- |--- |--- +|supervisors| Array| Array of supervisor summaries| +|workers| Array| Array of worker summaries | +|schedulerDisplayResource| Boolean | Whether to display scheduler resource information| + +Each supervisor is defined by: + +|Field |Value|Description| +|--- |--- |--- +|id| String | Supervisor's id| +|host| String| Supervisor's host name| +|uptime| String| Shows how long the supervisor is running| +|uptimeSeconds| Integer| Shows how long the supervisor is running in seconds| +|slotsTotal| Integer| Total number of worker slots for this supervisor| +|slotsUsed| Integer| Number of worker slots used on this supervisor| +|totalMem| Double| Total memory capacity on this supervisor| +|totalCpu| Double| Total CPU capacity on this supervisor| +|usedMem| Double| Used memory capacity on this supervisor| +|usedCpu| Double| Used CPU capacity on this supervisor| + +Each worker is defined by: + +|Field |Value |Description| +|-------|-------|-----------| +|supervisorId | String| Supervisor's id| +|host | String | Worker's host name| +|port | Integer | Worker's port| +|topologyId | String | Topology Id| +|topologyName | String | Topology Name| +|executorsTotal | Integer | Number of executors used by the topology in this worker| +|assignedMemOnHeap | Double | Assigned On-Heap Memory by Scheduler (MB)| +|assignedMemOffHeap | Double | Assigned Off-Heap Memory by Scheduler (MB)| +|assignedCpu | Number | Assigned CPU by Scheduler (%)| +|componentNumTasks | Dictionary | Components -> # of executing tasks| +|uptime| String| Shows how long the worker is running| +|uptimeSeconds| Integer| Shows how long the worker is running in seconds| +|workerLogLink | String | Link to worker log viewer page| + +Sample response: + +```json +{ + "supervisors": [{ + "totalMem": 4096.0, + "host":"192.168.10.237", + "id":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212", + "uptime":"7m 8s", + "totalCpu":400.0, + "usedCpu":495.0, + "usedMem":3432.0, + "slotsUsed":2, + "version":"0.10.1", + "slotsTotal":4, + "uptimeSeconds":428 + }], + "schedulerDisplayResource":true, + "workers":[{ + "topologyName":"ras", + "topologyId":"ras-4-1460229987", + "host":"192.168.10.237", + "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212", + "assignedMemOnHeap":704.0, + "uptime":"2m 47s", + "uptimeSeconds":167, + "port":6707, + "workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6707%2Fworker.log", + "componentNumTasks": { + "word":5 + }, + "executorsTotal":8, + "assignedCpu":130.0, + "assignedMemOffHeap":80.0 + }, + { + "topologyName":"ras", + "topologyId":"ras-4-1460229987", + "host":"192.168.10.237", + "supervisorId":"bdfe8eff-f1d8-4bce-81f5-9d3ae1bf432e-169.254.129.212", + "assignedMemOnHeap":904.0, + "uptime":"2m 53s", + "port":6706, + "workerLogLink":"http:\/\/192.168.10.237:8000\/log?file=ras-4-1460229987%2F6706%2Fworker.log", + "componentNumTasks":{ + "exclaim2":2, + "exclaim1":3, + "word":5 + }, + "executorsTotal":10, + "uptimeSeconds":173, + "assignedCpu":165.0, + "assignedMemOffHeap":80.0 + }] +} +``` + ### /api/v1/topology/summary (GET) Returns summary information for all topologies. @@ -232,6 +350,7 @@ Response fields: |assignedMemOffHeap| Double|Assigned Off-Heap Memory by Scheduler (MB)| |assignedTotalMem| Double|Assigned Total Memory by Scheduler (MB)| |assignedCpu| Double|Assigned CPU by Scheduler (%)| +|schedulerDisplayResource| Boolean | Whether to display scheduler resource information| Sample response: @@ -257,7 +376,7 @@ Sample response: "assignedTotalMem": 768, "assignedCpu": 80 } - ] + ], "schedulerDisplayResource": true } ``` http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/docs/images/supervisor_page.png ---------------------------------------------------------------------- diff --git a/docs/images/supervisor_page.png b/docs/images/supervisor_page.png new file mode 100644 index 0000000..5133681 Binary files /dev/null and b/docs/images/supervisor_page.png differ http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 29d9f28..c17e2fd 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -45,7 +45,7 @@ KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction - ProfileRequest ProfileAction NodeInfo]) + ProfileRequest ProfileAction NodeInfo SupervisorPageInfo WorkerSummary WorkerResources]) (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) (:use [org.apache.storm util config log timer zookeeper local-state]) @@ -95,6 +95,7 @@ (defmeter nimbus:num-getTopologyInfoWithOpts-calls) (defmeter nimbus:num-getTopologyInfo-calls) (defmeter nimbus:num-getTopologyPageInfo-calls) +(defmeter nimbus:num-getSupervisorPageInfo-calls) (defmeter nimbus:num-getComponentPageInfo-calls) (defmeter nimbus:num-shutdown-calls) @@ -210,6 +211,7 @@ :id->sched-status (atom {}) :node-id->resources (atom {}) ;;resources of supervisors :id->resources (atom {}) ;;resources of topologies + :id->worker-resources (atom {}) ; resources of workers per topology :cred-renewers (AuthUtils/GetCredentialRenewers conf) :topology-history-lock (Object.) :topo-history-state (nimbus-topo-history-state conf) @@ -428,7 +430,8 @@ {}) )) -(defn- all-supervisor-info +;; public for testing +(defn all-supervisor-info ([storm-cluster-state] (all-supervisor-info storm-cluster-state nil)) ([storm-cluster-state callback] (let [supervisor-ids (.supervisors storm-cluster-state callback)] @@ -738,8 +741,7 @@ all-ports (-> (get all-scheduling-slots sid) (set/difference dead-ports) ((fn [ports] (map int ports)))) - supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map supervisor-info)) - ]] + supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map supervisor-info))]] {sid supervisor-details}))] (merge all-supervisor-details (into {} @@ -818,6 +820,9 @@ new-topology->executor->node+port)) +(defrecord TopologyResources [requested-mem-on-heap requested-mem-off-heap requested-cpu + assigned-mem-on-heap assigned-mem-off-heap assigned-cpu]) + ;; public so it can be mocked out (defn compute-new-scheduler-assignments [nimbus existing-assignments topologies scratch-topology-id] (let [conf (:conf nimbus) @@ -855,19 +860,77 @@ (apply merge-with set/union)) supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports) - cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf) - _ (.setStatusMap cluster (deref (:id->sched-status nimbus))) - ;; call scheduler.schedule to schedule all the topologies - ;; the new assignments for all the topologies are in the cluster object. - _ (.schedule (:scheduler nimbus) topologies cluster) - _ (.setTopologyResourcesMap cluster @(:id->resources nimbus)) - _ (if-not (conf SCHEDULER-DISPLAY-RESOURCE) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies)) - ;;merge with existing statuses - _ (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster))) - _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster)) - _ (reset! (:id->resources nimbus) (.getTopologyResourcesMap cluster))] + cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf)] + + ;; set the status map with existing topology statuses + (.setStatusMap cluster (deref (:id->sched-status nimbus))) + ;; call scheduler.schedule to schedule all the topologies + ;; the new assignments for all the topologies are in the cluster object. + (.schedule (:scheduler nimbus) topologies cluster) + + ;;merge with existing statuses + (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster))) + (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster)) + + (if-not (conf SCHEDULER-DISPLAY-RESOURCE) + (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies)) + + ; Remove both of swaps below at first opportunity. This is a hack for non-ras scheduler topology and worker resources. + (swap! (:id->resources nimbus) merge (into {} (map (fn [[k v]] [k (->TopologyResources (nth v 0) (nth v 1) (nth v 2) + (nth v 3) (nth v 4) (nth v 5))]) + (.getTopologyResourcesMap cluster)))) + ; Remove this also at first chance + (swap! (:id->worker-resources nimbus) merge + (into {} (map (fn [[k v]] [k (map-val #(doto (WorkerResources.) + (.set_mem_on_heap (nth % 0)) + (.set_mem_off_heap (nth % 1)) + (.set_cpu (nth % 2))) v)]) + (.getWorkerResourcesMap cluster)))) + (.getAssignments cluster))) +(defn get-resources-for-topology [nimbus topo-id] + (or (get @(:id->resources nimbus) topo-id) + (try + (let [storm-cluster-state (:storm-cluster-state nimbus) + topology-details (read-topology-details nimbus topo-id) + assigned-resources (->> (.assignment-info storm-cluster-state topo-id nil) + :worker->resources + (vals) + ; Default to [[0 0 0]] if there are no values + (#(or % [[0 0 0]])) + ; [[on-heap, off-heap, cpu]] -> [[on-heap], [off-heap], [cpu]] + (apply map vector) + ; [[on-heap], [off-heap], [cpu]] -> [on-heap-sum, off-heap-sum, cpu-sum] + (map (partial reduce +))) + worker-resources (->TopologyResources (.getTotalRequestedMemOnHeap topology-details) + (.getTotalRequestedMemOffHeap topology-details) + (.getTotalRequestedCpu topology-details) + (nth assigned-resources 0) + (nth assigned-resources 1) + (nth assigned-resources 2))] + (swap! (:id->resources nimbus) assoc topo-id worker-resources) + worker-resources) + (catch KeyNotFoundException e + ; This can happen when a topology is first coming up. + ; It's thrown by the blobstore code. + (log-error e "Failed to get topology details") + (->TopologyResources 0 0 0 0 0 0))))) + +(defn- get-worker-resources-for-topology [nimbus topo-id] + (or (get @(:id->worker-resources nimbus) topo-id) + (try + (let [storm-cluster-state (:storm-cluster-state nimbus) + assigned-resources (->> (.assignment-info storm-cluster-state topo-id nil) + :worker->resources) + worker-resources (into {} (map #(identity {(WorkerSlot. (first (key %)) (second (key %))) + (doto (WorkerResources.) + (.set_mem_on_heap (nth (val %) 0)) + (.set_mem_off_heap (nth (val %) 1)) + (.set_cpu (nth (val %) 2)))}) assigned-resources))] + (swap! (:id->worker-resources nimbus) assoc topo-id worker-resources) + worker-resources)))) + (defn changed-executors [executor->node+port new-executor->node+port] (let [executor->node+port (if executor->node+port (sort executor->node+port) nil) new-executor->node+port (if new-executor->node+port (sort new-executor->node+port) nil) @@ -959,6 +1022,10 @@ start-times worker->resources)}))] + (when (not= new-assignments existing-assignments) + (log-debug "RESETTING id->resources and id->worker-resources cache!") + (reset! (:id->resources nimbus) {}) + (reset! (:id->worker-resources nimbus) {})) ;; tasks figure out what tasks to talk to by looking at topology at runtime ;; only log/set when there's been a change to the assignment (doseq [[topology-id assignment] new-assignments @@ -1026,6 +1093,18 @@ (throw (AlreadyAliveException. (str storm-name " is already active")))) )) +(defn try-read-storm-conf [conf storm-id blob-store] + (try-cause + (read-storm-conf-as-nimbus conf storm-id blob-store) + (catch KeyNotFoundException e + (throw (NotAliveException. (str storm-id)))))) + +(defn try-read-storm-conf-from-name [conf storm-name nimbus] + (let [storm-cluster-state (:storm-cluster-state nimbus) + blob-store (:blob-store nimbus) + id (get-storm-id storm-cluster-state storm-name)] + (try-read-storm-conf conf id blob-store))) + (defn check-authorization! ([nimbus storm-name storm-conf operation context] (let [aclHandler (:authorization-handler nimbus) @@ -1051,6 +1130,15 @@ ([nimbus storm-name storm-conf operation] (check-authorization! nimbus storm-name storm-conf operation (ReqContext/context)))) +;; no-throw version of check-authorization! +(defn is-authorized? + [nimbus conf blob-store operation topology-id] + (let [topology-conf (try-read-storm-conf conf topology-id blob-store) + storm-name (topology-conf TOPOLOGY-NAME)] + (try (check-authorization! nimbus storm-name topology-conf operation) + true + (catch AuthorizationException e false)))) + (defn code-ids [blob-store] (let [to-id (reify KeyFilter (filter [this key] (get-id-from-blob-key key)))] @@ -1355,24 +1443,55 @@ (defmethod blob-sync :local [conf nimbus] nil) +(defn make-supervisor-summary + [nimbus id info] + (let [ports (set (:meta info)) ;;TODO: this is only true for standalone + sup-sum (SupervisorSummary. (:hostname info) + (:uptime-secs info) + (count ports) + (count (:used-ports info)) + id)] + (.set_total_resources sup-sum (map-val double (:resources-map info))) + (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)] + (.set_used_mem sup-sum (or used-mem 0)) + (.set_used_cpu sup-sum (or used-cpu 0))) + (when-let [version (:version info)] (.set_version sup-sum version)) + sup-sum)) + +(defn user-and-supervisor-topos + [nimbus conf blob-store assignments supervisor-id] + (let [topo-id->supervisors + (into {} (for [[topo-id assignment] assignments] + {topo-id (into #{} + (map #(first (second %)) + (:executor->node+port assignment)))})) + supervisor-topologies (keys (filter #(get (val %) supervisor-id) topo-id->supervisors))] + {:supervisor-topologies supervisor-topologies + :user-topologies (into #{} (filter (partial is-authorized? nimbus + conf + blob-store + "getTopology") + supervisor-topologies))})) + +(defn topology-assignments + [storm-cluster-state] + (let [assigned-topology-ids (.assignments storm-cluster-state nil)] + (into {} (for [tid assigned-topology-ids] + {tid (.assignment-info storm-cluster-state tid nil)})))) + +(defn get-launch-time-secs + [base storm-id] + (if base (:launch-time-secs base) + (throw + (NotAliveException. (str storm-id))))) + (defn get-cluster-info [nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) supervisor-infos (all-supervisor-info storm-cluster-state) ;; TODO: need to get the port info about supervisors... ;; in standalone just look at metadata, otherwise just say N/A? supervisor-summaries (dofor [[id info] supervisor-infos] - (let [ports (set (:meta info)) ;;TODO: this is only true for standalone - sup-sum (SupervisorSummary. (:hostname info) - (:uptime-secs info) - (count ports) - (count (:used-ports info)) - id) ] - (.set_total_resources sup-sum (map-val double (:resources-map info))) - (when-let [[total-mem total-cpu used-mem used-cpu] (.get @(:node-id->resources nimbus) id)] - (.set_used_mem sup-sum used-mem) - (.set_used_cpu sup-sum used-cpu)) - (when-let [version (:version info)] (.set_version sup-sum version)) - sup-sum)) + (make-supervisor-summary nimbus id info)) nimbus-uptime ((:uptime nimbus)) bases (topology-bases storm-cluster-state) nimbuses (.nimbuses storm-cluster-state) @@ -1404,13 +1523,13 @@ (extract-status-str base))] (when-let [owner (:owner base)] (.set_owner topo-summ owner)) (when-let [sched-status (.get @(:id->sched-status nimbus) id)] (.set_sched_status topo-summ sched-status)) - (when-let [resources (.get @(:id->resources nimbus) id)] - (.set_requested_memonheap topo-summ (get resources 0)) - (.set_requested_memoffheap topo-summ (get resources 1)) - (.set_requested_cpu topo-summ (get resources 2)) - (.set_assigned_memonheap topo-summ (get resources 3)) - (.set_assigned_memoffheap topo-summ (get resources 4)) - (.set_assigned_cpu topo-summ (get resources 5))) + (when-let [resources (get-resources-for-topology nimbus id)] + (.set_requested_memonheap topo-summ (:requested-mem-on-heap resources)) + (.set_requested_memoffheap topo-summ (:requested-mem-off-heap resources)) + (.set_requested_cpu topo-summ (:requested-cpu resources)) + (.set_assigned_memonheap topo-summ (:assigned-mem-on-heap resources)) + (.set_assigned_memoffheap topo-summ (:assigned-mem-off-heap resources)) + (.set_assigned_cpu topo-summ (:assigned-cpu resources))) (.set_replication_count topo-summ (get-blob-replication-count (master-stormcode-key id) nimbus)) topo-summ)) ret (ClusterSummary. supervisor-summaries @@ -1469,9 +1588,7 @@ topology (try-read-storm-topology storm-id blob-store) task->component (storm-task-info topology topology-conf) base (.storm-base storm-cluster-state storm-id nil) - launch-time-secs (if base (:launch-time-secs base) - (throw - (NotAliveException. (str storm-id)))) + launch-time-secs (get-launch-time-secs base storm-id) assignment (.assignment-info storm-cluster-state storm-id nil) beats (map-val :heartbeat (get @(:heartbeats-cache nimbus) storm-id)) @@ -1874,13 +1991,13 @@ )] (when-let [owner (:owner base)] (.set_owner topo-info owner)) (when-let [sched-status (.get @(:id->sched-status nimbus) storm-id)] (.set_sched_status topo-info sched-status)) - (when-let [resources (.get @(:id->resources nimbus) storm-id)] - (.set_requested_memonheap topo-info (get resources 0)) - (.set_requested_memoffheap topo-info (get resources 1)) - (.set_requested_cpu topo-info (get resources 2)) - (.set_assigned_memonheap topo-info (get resources 3)) - (.set_assigned_memoffheap topo-info (get resources 4)) - (.set_assigned_cpu topo-info (get resources 5))) + (when-let [resources (get-resources-for-topology nimbus storm-id)] + (.set_requested_memonheap topo-info (:requested-mem-on-heap resources)) + (.set_requested_memoffheap topo-info (:requested-mem-off-heap resources)) + (.set_requested_cpu topo-info (:requested-cpu resources)) + (.set_assigned_memonheap topo-info (:assigned-mem-on-heap resources)) + (.set_assigned_memoffheap topo-info (:assigned-mem-off-heap resources)) + (.set_assigned_cpu topo-info (:assigned-cpu resources))) (when-let [component->debug (:component->debug base)] (.set_component_debug topo-info (map-val converter/thriftify-debugoptions component->debug))) (.set_replication_count topo-info (get-blob-replication-count (master-stormcode-key storm-id) nimbus)) @@ -2046,45 +2163,98 @@ (^TopologyPageInfo getTopologyPageInfo [this ^String topo-id ^String window ^boolean include-sys?] (mark! nimbus:num-getTopologyPageInfo-calls) - (let [info (get-common-topo-info topo-id "getTopologyPageInfo") - - exec->node+port (:executor->node+port (:assignment info)) + (let [topo-info (get-common-topo-info topo-id "getTopologyPageInfo") + {:keys [storm-name + storm-cluster-state + launch-time-secs + assignment + beats + task->component + topology + base]} topo-info + exec->node+port (:executor->node+port assignment) + node->host (:node->host assignment) + worker->resources (get-worker-resources-for-topology nimbus topo-id) + worker-summaries (stats/agg-worker-stats topo-id + topo-info + worker->resources + include-sys? + true) ;; this is the topology page, so we know the user is authorized + + exec->node+port (:executor->node+port assignment) last-err-fn (partial get-last-error - (:storm-cluster-state info) + storm-cluster-state topo-id) topo-page-info (stats/agg-topo-execs-stats topo-id exec->node+port - (:task->component info) - (:beats info) - (:topology info) + task->component + beats + topology window include-sys? last-err-fn)] - (when-let [owner (:owner (:base info))] + (.set_workers topo-page-info worker-summaries) + (when-let [owner (:owner base)] (.set_owner topo-page-info owner)) (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)] (.set_sched_status topo-page-info sched-status)) - (when-let [resources (.get @(:id->resources nimbus) topo-id)] - (.set_requested_memonheap topo-page-info (get resources 0)) - (.set_requested_memoffheap topo-page-info (get resources 1)) - (.set_requested_cpu topo-page-info (get resources 2)) - (.set_assigned_memonheap topo-page-info (get resources 3)) - (.set_assigned_memoffheap topo-page-info (get resources 4)) - (.set_assigned_cpu topo-page-info (get resources 5))) + (when-let [resources (get-resources-for-topology nimbus topo-id)] + (.set_requested_memonheap topo-page-info (:requested-mem-on-heap resources)) + (.set_requested_memoffheap topo-page-info (:requested-mem-off-heap resources)) + (.set_requested_cpu topo-page-info (:requested-cpu resources)) + (.set_assigned_memonheap topo-page-info (:assigned-mem-on-heap resources)) + (.set_assigned_memoffheap topo-page-info (:assigned-mem-off-heap resources)) + (.set_assigned_cpu topo-page-info (:assigned-cpu resources))) (doto topo-page-info - (.set_name (:storm-name info)) - (.set_status (extract-status-str (:base info))) - (.set_uptime_secs (time-delta (:launch-time-secs info))) + (.set_name storm-name) + (.set_status (extract-status-str base)) + (.set_uptime_secs (time-delta launch-time-secs)) (.set_topology_conf (to-json (try-read-storm-conf conf topo-id (:blob-store nimbus)))) (.set_replication_count (get-blob-replication-count (master-stormcode-key topo-id) nimbus))) (when-let [debug-options - (get-in info [:base :component->debug topo-id])] + (get-in topo-info [:base :component->debug topo-id])] (.set_debug_options topo-page-info (converter/thriftify-debugoptions debug-options))) topo-page-info)) + (^SupervisorPageInfo getSupervisorPageInfo + [this + ^String supervisor-id + ^String host + ^boolean include-sys?] + (.mark nimbus:num-getSupervisorPageInfo-calls) + (let [storm-cluster-state (:storm-cluster-state nimbus) + supervisor-infos (all-supervisor-info storm-cluster-state) + host->supervisor-id (reverse-map (map-val :hostname supervisor-infos)) + supervisor-ids (if (nil? supervisor-id) + (get host->supervisor-id host) + [supervisor-id]) + page-info (SupervisorPageInfo.)] + (doseq [sid supervisor-ids] + (let [supervisor-info (get supervisor-infos sid) + sup-sum (make-supervisor-summary nimbus sid supervisor-info) + _ (.add_to_supervisor_summaries page-info sup-sum) + topo-id->assignments (topology-assignments storm-cluster-state) + {:keys [user-topologies + supervisor-topologies]} (user-and-supervisor-topos nimbus + conf + blob-store + topo-id->assignments + sid)] + (doseq [storm-id supervisor-topologies] + (let [topo-info (get-common-topo-info storm-id "getSupervisorPageInfo") + worker->resources (get-worker-resources-for-topology nimbus storm-id)] + (doseq [worker-summary (stats/agg-worker-stats storm-id + topo-info + worker->resources + include-sys? + (get user-topologies storm-id) + sid)] + (.add_to_worker_summaries page-info worker-summary)))))) + page-info)) + (^ComponentPageInfo getComponentPageInfo [this ^String topo-id http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/clj/org/apache/storm/stats.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj index 26a4eb4..17d0219 100644 --- a/storm-core/src/clj/org/apache/storm/stats.clj +++ b/storm-core/src/clj/org/apache/storm/stats.clj @@ -21,9 +21,11 @@ ExecutorSpecificStats SpoutStats BoltStats ErrorInfo SupervisorSummary CommonAggregateStats ComponentAggregateStats ComponentPageInfo ComponentType BoltAggregateStats - ExecutorAggregateStats SpecificAggregateStats - SpoutAggregateStats TopologyPageInfo TopologyStats]) + ExecutorAggregateStats WorkerSummary SpecificAggregateStats + SpoutAggregateStats TopologyPageInfo TopologyStats + WorkerResources]) (:import [org.apache.storm.utils Utils]) + (:import [org.apache.storm.scheduler WorkerSlot]) (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]) (:use [org.apache.storm log util]) (:use [clojure.math.numeric-tower :only [ceil]])) @@ -256,7 +258,6 @@ (.get_failed stats) (.get_complete_ms_avg stats)]) - (defn clojurify-executor-stats [^ExecutorStats stats] (let [ specific-stats (.get_specific stats) @@ -1002,15 +1003,62 @@ window->complete-latency) (.set_window_to_acked window->acked) (.set_window_to_failed window->failed)) - topo-page-info (doto (TopologyPageInfo. topology-id) - (.set_num_tasks num-tasks) - (.set_num_workers num-workers) - (.set_num_executors num-executors) - (.set_id_to_spout_agg_stats spout-agg-stats) - (.set_id_to_bolt_agg_stats bolt-agg-stats) - (.set_topology_stats topology-stats))] + topo-page-info (doto (TopologyPageInfo. topology-id) + (.set_num_tasks num-tasks) + (.set_num_workers num-workers) + (.set_num_executors num-executors) + (.set_id_to_spout_agg_stats spout-agg-stats) + (.set_id_to_bolt_agg_stats bolt-agg-stats) + (.set_topology_stats topology-stats))] topo-page-info)) +(defn agg-worker-stats + "Aggregate statistics per worker for a topology. Optionally filtering on specific supervisors." + ([storm-id topo-info worker->resources include-sys? user-authorized] + (agg-worker-stats storm-id topo-info worker->resources include-sys? user-authorized nil)) + ([storm-id topo-info worker->resources include-sys? user-authorized filter-supervisor] + (let [{:keys [storm-name + assignment + beats + task->component]} topo-info + exec->node+port (:executor->node+port assignment) + node->host (:node->host assignment) + all-node+port->exec (reverse-map exec->node+port) + node+port->exec (if (nil? filter-supervisor) + all-node+port->exec + (filter #(= filter-supervisor (ffirst %)) all-node+port->exec)) + handle-sys-components-fn (mk-include-sys-fn include-sys?)] + (dofor [[[node port] executors] node+port->exec] + (let [executor-tasks (map #(range (first %) (inc (last %))) executors) + worker-beats (vals (select-keys beats executors)) + not-null-worker-beat (first (filter identity worker-beats)) + worker-uptime (or (:uptime not-null-worker-beat) 0) + ;; list of components per executor ((c1 c2 c3) (c4) (c5)) + ;; if the executor was running only system components, an empty list for that executor is possible + components-per-executor (for [tasks executor-tasks] + (filter handle-sys-components-fn (map #(get task->component %) tasks))) + component->num-tasks (frequencies (flatten components-per-executor)) + num-executors (count executors) + default-worker-resources (WorkerResources.) + resources (if (nil? worker->resources) + default-worker-resources + (or (.get worker->resources (WorkerSlot. node port)) + default-worker-resources)) + worker-summary (doto + (WorkerSummary.) + (.set_host (node->host node)) + (.set_uptime_secs worker-uptime) + (.set_supervisor_id node) + (.set_port port) + (.set_topology_id storm-id) + (.set_topology_name storm-name) + (.set_num_executors num-executors) + (.set_assigned_memonheap (.get_mem_on_heap resources)) + (.set_assigned_memoffheap (.get_mem_off_heap resources)) + (.set_assigned_cpu (.get_cpu resources)))] + (if user-authorized (.set_component_to_num_tasks worker-summary component->num-tasks)) + worker-summary))))) + (defn agg-topo-execs-stats "Aggregate various executor statistics for a topology from the given heartbeats." http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index 8b59aab..4cb01f9 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -38,7 +38,7 @@ TopologyStats CommonAggregateStats ComponentAggregateStats ComponentType BoltAggregateStats SpoutAggregateStats ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo - LogConfig LogLevel LogLevelAction]) + LogConfig LogLevel LogLevelAction SupervisorPageInfo WorkerSummary]) (:import [org.apache.storm.security.auth AuthUtils ReqContext]) (:import [org.apache.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo]) (:import [org.apache.storm.security.auth AuthUtils]) @@ -64,6 +64,7 @@ (defmeter ui:num-cluster-configuration-http-requests) (defmeter ui:num-cluster-summary-http-requests) (defmeter ui:num-nimbus-summary-http-requests) +(defmeter ui:num-supervisor-http-requests) (defmeter ui:num-supervisor-summary-http-requests) (defmeter ui:num-all-topologies-summary-http-requests) (defmeter ui:num-topology-page-http-requests) @@ -410,26 +411,77 @@ "nimbusUpTime" (pretty-uptime-sec uptime) "nimbusUpTimeSeconds" uptime}))}))) +(defn worker-summary-to-json + [secure? ^WorkerSummary worker-summary] + (let [host (.get_host worker-summary) + port (.get_port worker-summary) + topology-id (.get_topology_id worker-summary) + uptime-secs (.get_uptime_secs worker-summary)] + {"supervisorId" (.get_supervisor_id worker-summary) + "host" host + "port" port + "topologyId" topology-id + "topologyName" (.get_topology_name worker-summary) + "executorsTotal" (.get_num_executors worker-summary) + "assignedMemOnHeap" (.get_assigned_memonheap worker-summary) + "assignedMemOffHeap" (.get_assigned_memoffheap worker-summary) + "assignedCpu" (.get_assigned_cpu worker-summary) + "componentNumTasks" (.get_component_to_num_tasks worker-summary) + "uptime" (pretty-uptime-sec uptime-secs) + "uptimeSeconds" uptime-secs + "workerLogLink" (worker-log-link host port topology-id secure?)})) + +(defn supervisor-summary-to-json + [summary] + (let [slotsTotal (.get_num_workers summary) + slotsUsed (.get_num_used_workers summary) + slotsFree (max (- slotsTotal slotsUsed) 0) + totalMem (get (.get_total_resources summary) Config/SUPERVISOR_MEMORY_CAPACITY_MB) + totalCpu (get (.get_total_resources summary) Config/SUPERVISOR_CPU_CAPACITY) + usedMem (.get_used_mem summary) + usedCpu (.get_used_cpu summary) + availMem (max (- totalMem usedMem) 0) + availCpu (max (- totalCpu usedCpu) 0)] + {"id" (.get_supervisor_id summary) + "host" (.get_host summary) + "uptime" (pretty-uptime-sec (.get_uptime_secs summary)) + "uptimeSeconds" (.get_uptime_secs summary) + "slotsTotal" slotsTotal + "slotsUsed" slotsUsed + "slotsFree" slotsFree + "totalMem" totalMem + "totalCpu" totalCpu + "usedMem" usedMem + "usedCpu" usedCpu + "logLink" (supervisor-log-link (.get_host summary)) + "availMem" availMem + "availCpu" availCpu + "version" (.get_version summary)})) + +(defn supervisor-page-info + ([supervisor-id host include-sys? secure?] + (thrift/with-configured-nimbus-connection + nimbus (supervisor-page-info (.getSupervisorPageInfo ^Nimbus$Client nimbus + supervisor-id + host + include-sys?) secure?))) + ([^SupervisorPageInfo supervisor-page-info secure?] + ;; ask nimbus to return supervisor workers + any details user is allowed + ;; access on a per-topology basis (i.e. components) + (let [supervisors-json (map supervisor-summary-to-json (.get_supervisor_summaries supervisor-page-info))] + {"supervisors" supervisors-json + "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE) + "workers" (into [] (for [^WorkerSummary worker-summary (.get_worker_summaries supervisor-page-info)] + (worker-summary-to-json secure? worker-summary)))}))) + (defn supervisor-summary ([] (thrift/with-configured-nimbus-connection nimbus (supervisor-summary (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus))))) ([summs] - {"supervisors" - (for [^SupervisorSummary s summs] - {"id" (.get_supervisor_id s) - "host" (.get_host s) - "uptime" (pretty-uptime-sec (.get_uptime_secs s)) - "uptimeSeconds" (.get_uptime_secs s) - "slotsTotal" (.get_num_workers s) - "slotsUsed" (.get_num_used_workers s) - "totalMem" (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB) - "totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY) - "usedMem" (.get_used_mem s) - "usedCpu" (.get_used_cpu s) - "logLink" (supervisor-log-link (.get_host s)) - "version" (.get_version s)}) + {"supervisors" (for [^SupervisorSummary s summs] + (supervisor-summary-to-json s)) "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)})) (defn all-topologies-summary @@ -588,6 +640,8 @@ "assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info)) "assignedCpu" (.get_assigned_cpu topo-info) "topologyStats" topo-stats + "workers" (map (partial worker-summary-to-json secure?) + (.get_workers topo-info)) "spouts" (map (partial comp-agg-stats-json id secure?) (.get_id_to_spout_agg_stats topo-info)) "bolts" (map (partial comp-agg-stats-json id secure?) @@ -1046,6 +1100,16 @@ (assert-authorized-user "getClusterInfo") (json-response (assoc (supervisor-summary) "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m))) + (GET "/api/v1/supervisor" [:as {:keys [cookies servlet-request scheme]} & m] + (.mark ui:num-supervisor-http-requests) + (populate-context! servlet-request) + (assert-authorized-user "getSupervisorPageInfo") + ;; supervisor takes either an id or a host query parameter, and technically both + ;; that said, if both the id and host are provided, the id wins + (let [id (:id m) + host (:host m)] + (json-response (assoc (supervisor-page-info id host (check-include-sys? (:sys m)) (= scheme :https)) + "logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m)))) (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m] (mark! ui:num-all-topologies-summary-http-requests) (populate-context! servlet-request) http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/jvm/org/apache/storm/generated/Assignment.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java index c7a3f8a..90b7516 100644 --- a/storm-core/src/jvm/org/apache/storm/generated/Assignment.java +++ b/storm-core/src/jvm/org/apache/storm/generated/Assignment.java @@ -787,15 +787,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen case 2: // NODE_HOST if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map548 = iprot.readMapBegin(); - struct.node_host = new HashMap<String,String>(2*_map548.size); - String _key549; - String _val550; - for (int _i551 = 0; _i551 < _map548.size; ++_i551) + org.apache.thrift.protocol.TMap _map582 = iprot.readMapBegin(); + struct.node_host = new HashMap<String,String>(2*_map582.size); + String _key583; + String _val584; + for (int _i585 = 0; _i585 < _map582.size; ++_i585) { - _key549 = iprot.readString(); - _val550 = iprot.readString(); - struct.node_host.put(_key549, _val550); + _key583 = iprot.readString(); + _val584 = iprot.readString(); + struct.node_host.put(_key583, _val584); } iprot.readMapEnd(); } @@ -807,26 +807,26 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen case 3: // EXECUTOR_NODE_PORT if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map552 = iprot.readMapBegin(); - struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map552.size); - List<Long> _key553; - NodeInfo _val554; - for (int _i555 = 0; _i555 < _map552.size; ++_i555) + org.apache.thrift.protocol.TMap _map586 = iprot.readMapBegin(); + struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map586.size); + List<Long> _key587; + NodeInfo _val588; + for (int _i589 = 0; _i589 < _map586.size; ++_i589) { { - org.apache.thrift.protocol.TList _list556 = iprot.readListBegin(); - _key553 = new ArrayList<Long>(_list556.size); - long _elem557; - for (int _i558 = 0; _i558 < _list556.size; ++_i558) + org.apache.thrift.protocol.TList _list590 = iprot.readListBegin(); + _key587 = new ArrayList<Long>(_list590.size); + long _elem591; + for (int _i592 = 0; _i592 < _list590.size; ++_i592) { - _elem557 = iprot.readI64(); - _key553.add(_elem557); + _elem591 = iprot.readI64(); + _key587.add(_elem591); } iprot.readListEnd(); } - _val554 = new NodeInfo(); - _val554.read(iprot); - struct.executor_node_port.put(_key553, _val554); + _val588 = new NodeInfo(); + _val588.read(iprot); + struct.executor_node_port.put(_key587, _val588); } iprot.readMapEnd(); } @@ -838,25 +838,25 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen case 4: // EXECUTOR_START_TIME_SECS if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map559 = iprot.readMapBegin(); - struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map559.size); - List<Long> _key560; - long _val561; - for (int _i562 = 0; _i562 < _map559.size; ++_i562) + org.apache.thrift.protocol.TMap _map593 = iprot.readMapBegin(); + struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map593.size); + List<Long> _key594; + long _val595; + for (int _i596 = 0; _i596 < _map593.size; ++_i596) { { - org.apache.thrift.protocol.TList _list563 = iprot.readListBegin(); - _key560 = new ArrayList<Long>(_list563.size); - long _elem564; - for (int _i565 = 0; _i565 < _list563.size; ++_i565) + org.apache.thrift.protocol.TList _list597 = iprot.readListBegin(); + _key594 = new ArrayList<Long>(_list597.size); + long _elem598; + for (int _i599 = 0; _i599 < _list597.size; ++_i599) { - _elem564 = iprot.readI64(); - _key560.add(_elem564); + _elem598 = iprot.readI64(); + _key594.add(_elem598); } iprot.readListEnd(); } - _val561 = iprot.readI64(); - struct.executor_start_time_secs.put(_key560, _val561); + _val595 = iprot.readI64(); + struct.executor_start_time_secs.put(_key594, _val595); } iprot.readMapEnd(); } @@ -868,17 +868,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen case 5: // WORKER_RESOURCES if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map566 = iprot.readMapBegin(); - struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map566.size); - NodeInfo _key567; - WorkerResources _val568; - for (int _i569 = 0; _i569 < _map566.size; ++_i569) + org.apache.thrift.protocol.TMap _map600 = iprot.readMapBegin(); + struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map600.size); + NodeInfo _key601; + WorkerResources _val602; + for (int _i603 = 0; _i603 < _map600.size; ++_i603) { - _key567 = new NodeInfo(); - _key567.read(iprot); - _val568 = new WorkerResources(); - _val568.read(iprot); - struct.worker_resources.put(_key567, _val568); + _key601 = new NodeInfo(); + _key601.read(iprot); + _val602 = new WorkerResources(); + _val602.read(iprot); + struct.worker_resources.put(_key601, _val602); } iprot.readMapEnd(); } @@ -910,10 +910,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen oprot.writeFieldBegin(NODE_HOST_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.node_host.size())); - for (Map.Entry<String, String> _iter570 : struct.node_host.entrySet()) + for (Map.Entry<String, String> _iter604 : struct.node_host.entrySet()) { - oprot.writeString(_iter570.getKey()); - oprot.writeString(_iter570.getValue()); + oprot.writeString(_iter604.getKey()); + oprot.writeString(_iter604.getValue()); } oprot.writeMapEnd(); } @@ -925,17 +925,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen oprot.writeFieldBegin(EXECUTOR_NODE_PORT_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, struct.executor_node_port.size())); - for (Map.Entry<List<Long>, NodeInfo> _iter571 : struct.executor_node_port.entrySet()) + for (Map.Entry<List<Long>, NodeInfo> _iter605 : struct.executor_node_port.entrySet()) { { - oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter571.getKey().size())); - for (long _iter572 : _iter571.getKey()) + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter605.getKey().size())); + for (long _iter606 : _iter605.getKey()) { - oprot.writeI64(_iter572); + oprot.writeI64(_iter606); } oprot.writeListEnd(); } - _iter571.getValue().write(oprot); + _iter605.getValue().write(oprot); } oprot.writeMapEnd(); } @@ -947,17 +947,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen oprot.writeFieldBegin(EXECUTOR_START_TIME_SECS_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, struct.executor_start_time_secs.size())); - for (Map.Entry<List<Long>, Long> _iter573 : struct.executor_start_time_secs.entrySet()) + for (Map.Entry<List<Long>, Long> _iter607 : struct.executor_start_time_secs.entrySet()) { { - oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter573.getKey().size())); - for (long _iter574 : _iter573.getKey()) + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter607.getKey().size())); + for (long _iter608 : _iter607.getKey()) { - oprot.writeI64(_iter574); + oprot.writeI64(_iter608); } oprot.writeListEnd(); } - oprot.writeI64(_iter573.getValue()); + oprot.writeI64(_iter607.getValue()); } oprot.writeMapEnd(); } @@ -969,10 +969,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen oprot.writeFieldBegin(WORKER_RESOURCES_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, struct.worker_resources.size())); - for (Map.Entry<NodeInfo, WorkerResources> _iter575 : struct.worker_resources.entrySet()) + for (Map.Entry<NodeInfo, WorkerResources> _iter609 : struct.worker_resources.entrySet()) { - _iter575.getKey().write(oprot); - _iter575.getValue().write(oprot); + _iter609.getKey().write(oprot); + _iter609.getValue().write(oprot); } oprot.writeMapEnd(); } @@ -1014,52 +1014,52 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen if (struct.is_set_node_host()) { { oprot.writeI32(struct.node_host.size()); - for (Map.Entry<String, String> _iter576 : struct.node_host.entrySet()) + for (Map.Entry<String, String> _iter610 : struct.node_host.entrySet()) { - oprot.writeString(_iter576.getKey()); - oprot.writeString(_iter576.getValue()); + oprot.writeString(_iter610.getKey()); + oprot.writeString(_iter610.getValue()); } } } if (struct.is_set_executor_node_port()) { { oprot.writeI32(struct.executor_node_port.size()); - for (Map.Entry<List<Long>, NodeInfo> _iter577 : struct.executor_node_port.entrySet()) + for (Map.Entry<List<Long>, NodeInfo> _iter611 : struct.executor_node_port.entrySet()) { { - oprot.writeI32(_iter577.getKey().size()); - for (long _iter578 : _iter577.getKey()) + oprot.writeI32(_iter611.getKey().size()); + for (long _iter612 : _iter611.getKey()) { - oprot.writeI64(_iter578); + oprot.writeI64(_iter612); } } - _iter577.getValue().write(oprot); + _iter611.getValue().write(oprot); } } } if (struct.is_set_executor_start_time_secs()) { { oprot.writeI32(struct.executor_start_time_secs.size()); - for (Map.Entry<List<Long>, Long> _iter579 : struct.executor_start_time_secs.entrySet()) + for (Map.Entry<List<Long>, Long> _iter613 : struct.executor_start_time_secs.entrySet()) { { - oprot.writeI32(_iter579.getKey().size()); - for (long _iter580 : _iter579.getKey()) + oprot.writeI32(_iter613.getKey().size()); + for (long _iter614 : _iter613.getKey()) { - oprot.writeI64(_iter580); + oprot.writeI64(_iter614); } } - oprot.writeI64(_iter579.getValue()); + oprot.writeI64(_iter613.getValue()); } } } if (struct.is_set_worker_resources()) { { oprot.writeI32(struct.worker_resources.size()); - for (Map.Entry<NodeInfo, WorkerResources> _iter581 : struct.worker_resources.entrySet()) + for (Map.Entry<NodeInfo, WorkerResources> _iter615 : struct.worker_resources.entrySet()) { - _iter581.getKey().write(oprot); - _iter581.getValue().write(oprot); + _iter615.getKey().write(oprot); + _iter615.getValue().write(oprot); } } } @@ -1073,81 +1073,81 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen BitSet incoming = iprot.readBitSet(4); if (incoming.get(0)) { { - org.apache.thrift.protocol.TMap _map582 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); - struct.node_host = new HashMap<String,String>(2*_map582.size); - String _key583; - String _val584; - for (int _i585 = 0; _i585 < _map582.size; ++_i585) + org.apache.thrift.protocol.TMap _map616 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); + struct.node_host = new HashMap<String,String>(2*_map616.size); + String _key617; + String _val618; + for (int _i619 = 0; _i619 < _map616.size; ++_i619) { - _key583 = iprot.readString(); - _val584 = iprot.readString(); - struct.node_host.put(_key583, _val584); + _key617 = iprot.readString(); + _val618 = iprot.readString(); + struct.node_host.put(_key617, _val618); } } struct.set_node_host_isSet(true); } if (incoming.get(1)) { { - org.apache.thrift.protocol.TMap _map586 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map586.size); - List<Long> _key587; - NodeInfo _val588; - for (int _i589 = 0; _i589 < _map586.size; ++_i589) + org.apache.thrift.protocol.TMap _map620 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map620.size); + List<Long> _key621; + NodeInfo _val622; + for (int _i623 = 0; _i623 < _map620.size; ++_i623) { { - org.apache.thrift.protocol.TList _list590 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32()); - _key587 = new ArrayList<Long>(_list590.size); - long _elem591; - for (int _i592 = 0; _i592 < _list590.size; ++_i592) + org.apache.thrift.protocol.TList _list624 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32()); + _key621 = new ArrayList<Long>(_list624.size); + long _elem625; + for (int _i626 = 0; _i626 < _list624.size; ++_i626) { - _elem591 = iprot.readI64(); - _key587.add(_elem591); + _elem625 = iprot.readI64(); + _key621.add(_elem625); } } - _val588 = new NodeInfo(); - _val588.read(iprot); - struct.executor_node_port.put(_key587, _val588); + _val622 = new NodeInfo(); + _val622.read(iprot); + struct.executor_node_port.put(_key621, _val622); } } struct.set_executor_node_port_isSet(true); } if (incoming.get(2)) { { - org.apache.thrift.protocol.TMap _map593 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32()); - struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map593.size); - List<Long> _key594; - long _val595; - for (int _i596 = 0; _i596 < _map593.size; ++_i596) + org.apache.thrift.protocol.TMap _map627 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32()); + struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map627.size); + List<Long> _key628; + long _val629; + for (int _i630 = 0; _i630 < _map627.size; ++_i630) { { - org.apache.thrift.protocol.TList _list597 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32()); - _key594 = new ArrayList<Long>(_list597.size); - long _elem598; - for (int _i599 = 0; _i599 < _list597.size; ++_i599) + org.apache.thrift.protocol.TList _list631 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32()); + _key628 = new ArrayList<Long>(_list631.size); + long _elem632; + for (int _i633 = 0; _i633 < _list631.size; ++_i633) { - _elem598 = iprot.readI64(); - _key594.add(_elem598); + _elem632 = iprot.readI64(); + _key628.add(_elem632); } } - _val595 = iprot.readI64(); - struct.executor_start_time_secs.put(_key594, _val595); + _val629 = iprot.readI64(); + struct.executor_start_time_secs.put(_key628, _val629); } } struct.set_executor_start_time_secs_isSet(true); } if (incoming.get(3)) { { - org.apache.thrift.protocol.TMap _map600 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map600.size); - NodeInfo _key601; - WorkerResources _val602; - for (int _i603 = 0; _i603 < _map600.size; ++_i603) + org.apache.thrift.protocol.TMap _map634 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map634.size); + NodeInfo _key635; + WorkerResources _val636; + for (int _i637 = 0; _i637 < _map634.size; ++_i637) { - _key601 = new NodeInfo(); - _key601.read(iprot); - _val602 = new WorkerResources(); - _val602.read(iprot); - struct.worker_resources.put(_key601, _val602); + _key635 = new NodeInfo(); + _key635.read(iprot); + _val636 = new WorkerResources(); + _val636.read(iprot); + struct.worker_resources.put(_key635, _val636); } } struct.set_worker_resources_isSet(true); http://git-wip-us.apache.org/repos/asf/storm/blob/0e0bcf27/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java b/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java index 8585a7d..59c0894 100644 --- a/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java +++ b/storm-core/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java @@ -635,17 +635,17 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo case 2: // EXECUTOR_STATS if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map624 = iprot.readMapBegin(); - struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map624.size); - ExecutorInfo _key625; - ExecutorStats _val626; - for (int _i627 = 0; _i627 < _map624.size; ++_i627) + org.apache.thrift.protocol.TMap _map658 = iprot.readMapBegin(); + struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map658.size); + ExecutorInfo _key659; + ExecutorStats _val660; + for (int _i661 = 0; _i661 < _map658.size; ++_i661) { - _key625 = new ExecutorInfo(); - _key625.read(iprot); - _val626 = new ExecutorStats(); - _val626.read(iprot); - struct.executor_stats.put(_key625, _val626); + _key659 = new ExecutorInfo(); + _key659.read(iprot); + _val660 = new ExecutorStats(); + _val660.read(iprot); + struct.executor_stats.put(_key659, _val660); } iprot.readMapEnd(); } @@ -692,10 +692,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo oprot.writeFieldBegin(EXECUTOR_STATS_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, struct.executor_stats.size())); - for (Map.Entry<ExecutorInfo, ExecutorStats> _iter628 : struct.executor_stats.entrySet()) + for (Map.Entry<ExecutorInfo, ExecutorStats> _iter662 : struct.executor_stats.entrySet()) { - _iter628.getKey().write(oprot); - _iter628.getValue().write(oprot); + _iter662.getKey().write(oprot); + _iter662.getValue().write(oprot); } oprot.writeMapEnd(); } @@ -727,10 +727,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo oprot.writeString(struct.storm_id); { oprot.writeI32(struct.executor_stats.size()); - for (Map.Entry<ExecutorInfo, ExecutorStats> _iter629 : struct.executor_stats.entrySet()) + for (Map.Entry<ExecutorInfo, ExecutorStats> _iter663 : struct.executor_stats.entrySet()) { - _iter629.getKey().write(oprot); - _iter629.getValue().write(oprot); + _iter663.getKey().write(oprot); + _iter663.getValue().write(oprot); } } oprot.writeI32(struct.time_secs); @@ -743,17 +743,17 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo struct.storm_id = iprot.readString(); struct.set_storm_id_isSet(true); { - org.apache.thrift.protocol.TMap _map630 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map630.size); - ExecutorInfo _key631; - ExecutorStats _val632; - for (int _i633 = 0; _i633 < _map630.size; ++_i633) + org.apache.thrift.protocol.TMap _map664 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map664.size); + ExecutorInfo _key665; + ExecutorStats _val666; + for (int _i667 = 0; _i667 < _map664.size; ++_i667) { - _key631 = new ExecutorInfo(); - _key631.read(iprot); - _val632 = new ExecutorStats(); - _val632.read(iprot); - struct.executor_stats.put(_key631, _val632); + _key665 = new ExecutorInfo(); + _key665.read(iprot); + _val666 = new ExecutorStats(); + _val666.read(iprot); + struct.executor_stats.put(_key665, _val666); } } struct.set_executor_stats_isSet(true);
