Repository: storm Updated Branches: refs/heads/1.x-branch 5021708fc -> e99961311
STORM-1719 Introduce REST API: Topology metric stats for stream * Path: /api/v1/topology/:id/metrics * This API provides detailed metrics for topology ** shows metrics per component, which are aggregated by stream * add documentation about new REST API ** please refer docs/STORM-UI-REST-API.md for details Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1c64714 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1c64714 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1c64714 Branch: refs/heads/1.x-branch Commit: a1c647142816ffb92bbaa6acbf234fdd83c3441e Parents: 3a4825e Author: Jungtaek Lim <[email protected]> Authored: Sat Apr 16 20:04:01 2016 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Mon Apr 18 15:02:39 2016 +0900 ---------------------------------------------------------------------- docs/STORM-UI-REST-API.md | 276 +++++++++++++++++++ storm-core/src/clj/org/apache/storm/ui/core.clj | 98 +++++++ 2 files changed, 374 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a1c64714/docs/STORM-UI-REST-API.md ---------------------------------------------------------------------- diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md index bbed956..a2fc39a 100644 --- a/docs/STORM-UI-REST-API.md +++ b/docs/STORM-UI-REST-API.md @@ -510,6 +510,282 @@ Sample response: } ``` +### /api/v1/topology/:id/metrics + +Returns detailed metrics for topology. It shows metrics per component, which are aggregated by stream. + +|Parameter |Value |Description | +|----------|--------|-------------| +|id |String (required)| Topology Id | +|window |String. Default value :all-time| window duration for metrics in seconds| +|sys |String. Values 1 or 0. Default value 0| Controls including sys stats part of the response| + +Response fields: + +|Field |Value |Description| +|--- |--- |--- +|window |String. Default value ":all-time" | window duration for metrics in seconds| + |windowHint| String | window param value in "hh mm ss" format. Default value is "All Time"| +|spouts| Array | Array of all the spout components in the topology| +|spouts.id| String |Spout id| +|spouts.emitted| Array | Array of all the output streams this spout emits messages | +|spouts.emitted.stream_id| String | Stream id for this stream | +|spouts.emitted.value| Long | Number of messages emitted in given window| +|spouts.transferred | Array | Array of all the output streams this spout transfers messages | +|spouts.transferred.stream_id| String | Stream id for this stream | +|spouts.transferred.value| Long |Number messages transferred in given window| +|spouts.acked| Array | Array of all the output streams this spout receives ack of messages | +|spouts.acked.stream_id| String | Stream id for this stream | +|spouts.acked.value| Long |Number of messages acked in given window| +|spouts.failed| Array | Array of all the output streams this spout receives fail of messages | +|spouts.failed.stream_id| String | Stream id for this stream | +|spouts.failed.value| Long |Number of messages failed in given window| +|spouts.complete_ms_avg| Array | Array of all the output streams this spout receives ack of messages | +|spouts.complete_ms_avg.stream_id| String | Stream id for this stream | +|spouts.complete_ms_avg.value| String (double value returned in String format) | Total latency for processing the message| +|bolts| Array | Array of all the bolt components in the topology| +|bolts.id| String |Bolt id| +|bolts.emitted| Array | Array of all the output streams this bolt emits messages | +|bolts.emitted.stream_id| String | Stream id for this stream | +|bolts.emitted.value| Long | Number of messages emitted in given window| +|bolts.transferred | Array | Array of all the output streams this bolt transfers messages | +|bolts.transferred.stream_id| String | Stream id for this stream | +|bolts.transferred.value| Long |Number messages transferred in given window| +|bolts.acked| Array | Array of all the input streams this bolt acknowledges of messages | +|bolts.acked.component_id| String | Component id for this stream | +|bolts.acked.stream_id| String | Stream id for this stream | +|bolts.acked.value| Long |Number of messages acked in given window| +|bolts.failed| Array | Array of all the input streams this bolt receives fail of messages | +|bolts.failed.component_id| String | Component id for this stream | +|bolts.failed.stream_id| String | Stream id for this stream | +|bolts.failed.value| Long |Number of messages failed in given window| +|bolts.process_ms_avg| Array | Array of all the input streams this spout acks messages | +|bolts.process_ms_avg.component_id| String | Component id for this stream | +|bolts.process_ms_avg.stream_id| String | Stream id for this stream | +|bolts.process_ms_avg.value| String (double value returned in String format) |Average time of the bolt to ack a message after it was received| +|bolts.executed| Array | Array of all the input streams this bolt executes messages | +|bolts.executed.component_id| String | Component id for this stream | +|bolts.executed.stream_id| String | Stream id for this stream | +|bolts.executed.value| Long |Number of messages executed in given window| +|bolts.executed_ms_avg| Array | Array of all the output streams this spout receives ack of messages | +|bolts.executed_ms_avg.component_id| String | Component id for this stream | +|bolts.executed_ms_avg.stream_id| String | Stream id for this stream | +|bolts.executed_ms_avg.value| String (double value returned in String format) | Average time to run the execute method of the bolt| + +Examples: + +```no-highlight +1. http://ui-daemon-host-name:8080/api/v1/topology/WordCount3-1-1402960825/metrics +1. http://ui-daemon-host-name:8080/api/v1/topology/WordCount3-1-1402960825/metrics?sys=1 +2. http://ui-daemon-host-name:8080/api/v1/topology/WordCount3-1-1402960825/metrics?window=600 +``` + +Sample response: + +```json +{ + "window":":all-time", + "window-hint":"All time", + "spouts":[ + { + "id":"spout", + "emitted":[ + { + "stream_id":"__metrics", + "value":20 + }, + { + "stream_id":"default", + "value":17350280 + }, + { + "stream_id":"__ack_init", + "value":17328160 + }, + { + "stream_id":"__system", + "value":20 + } + ], + "transferred":[ + { + "stream_id":"__metrics", + "value":20 + }, + { + "stream_id":"default", + "value":17350280 + }, + { + "stream_id":"__ack_init", + "value":17328160 + }, + { + "stream_id":"__system", + "value":0 + } + ], + "acked":[ + { + "stream_id":"default", + "value":17339180 + } + ], + "failed":[ + + ], + "complete_ms_avg":[ + { + "stream_id":"default", + "value":"920.497" + } + ] + } + ], + "bolts":[ + { + "id":"count", + "emitted":[ + { + "stream_id":"__metrics", + "value":120 + }, + { + "stream_id":"default", + "value":190748180 + }, + { + "stream_id":"__ack_ack", + "value":190718100 + }, + { + "stream_id":"__system", + "value":20 + } + ], + "transferred":[ + { + "stream_id":"__metrics", + "value":120 + }, + { + "stream_id":"default", + "value":0 + }, + { + "stream_id":"__ack_ack", + "value":190718100 + }, + { + "stream_id":"__system", + "value":0 + } + ], + "acked":[ + { + "component_id":"split", + "stream_id":"default", + "value":190733160 + } + ], + "failed":[ + + ], + "process_ms_avg":[ + { + "component_id":"split", + "stream_id":"default", + "value":"0.004" + } + ], + "executed":[ + { + "component_id":"split", + "stream_id":"default", + "value":190733140 + } + ], + "executed_ms_avg":[ + { + "component_id":"split", + "stream_id":"default", + "value":"0.005" + } + ] + }, + { + "id":"split", + "emitted":[ + { + "stream_id":"__metrics", + "value":60 + }, + { + "stream_id":"default", + "value":190754740 + }, + { + "stream_id":"__ack_ack", + "value":17317580 + }, + { + "stream_id":"__system", + "value":20 + } + ], + "transferred":[ + { + "stream_id":"__metrics", + "value":60 + }, + { + "stream_id":"default", + "value":190754740 + }, + { + "stream_id":"__ack_ack", + "value":17317580 + }, + { + "stream_id":"__system", + "value":0 + } + ], + "acked":[ + { + "component_id":"spout", + "stream_id":"default", + "value":17339180 + } + ], + "failed":[ + + ], + "process_ms_avg":[ + { + "component_id":"spout", + "stream_id":"default", + "value":"0.051" + } + ], + "executed":[ + { + "component_id":"spout", + "stream_id":"default", + "value":17339240 + } + ], + "executed_ms_avg":[ + { + "component_id":"spout", + "stream_id":"default", + "value":"0.052" + } + ] + } + ] +} +``` ### /api/v1/topology/:id/component/:component (GET) http://git-wip-us.apache.org/repos/asf/storm/blob/a1c64714/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index c35e051..0bcb492 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -65,6 +65,7 @@ (defmeter ui:num-supervisor-summary-http-requests) (defmeter ui:num-all-topologies-summary-http-requests) (defmeter ui:num-topology-page-http-requests) +(defmeter ui:num-topology-metric-http-requests) (defmeter ui:num-build-visualization-http-requests) (defmeter ui:num-mk-visualization-data-http-requests) (defmeter ui:num-component-page-http-requests) @@ -631,6 +632,98 @@ "visualizationTable" [] "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)})))) +(defn- average + [vals] + (/ (reduce + vals) (count vals))) + +(defn- merge-with-conj [& mlist] + (let [flatten-keys (set (filter identity (flatten (map keys mlist)))) + dict-keys-with-empty-list (zipmap flatten-keys (repeat '()))] + (apply merge-with conj dict-keys-with-empty-list mlist))) + +(defn- conj-specific-stats-by-field [window field-fn stats] + (apply merge-with-conj (map #(into {} (.get (field-fn %) window)) stats))) + +(defn- reduce-conj-specific-stats-by-field [agg-val-fn stats-map] + (let [fn-key-to-str (fn [key] + (condp instance? key + String {"stream_id" key} + GlobalStreamId {"component_id" (.get_componentId key) "stream_id" (.get_streamId key)} + {"stream_id" (str key)}))] + (reduce-kv #(conj %1 (merge (fn-key-to-str %2) {"value" (agg-val-fn %3)})) '() stats-map))) + +(defn- merge-stats-specific-field-by-stream + [window field-fn agg-val-fn stats] + (reduce-conj-specific-stats-by-field agg-val-fn + (conj-specific-stats-by-field window field-fn stats))) + +(defn- merge-executor-common-stats + [window executor-stats] + {"emitted" + (merge-stats-specific-field-by-stream window #(.get_emitted %) sum executor-stats) + "transferred" + (merge-stats-specific-field-by-stream window #(.get_transferred %) sum executor-stats) + }) + +(defmulti merge-executor-specific-stats + (fn [_ specific-stats] + (if (.is_set_spout (first specific-stats)) :spout :bolt))) + +(defmethod merge-executor-specific-stats :spout + [window specific-stats] + (let [stats (map #(.get_spout %) specific-stats)] + {"acked" + (merge-stats-specific-field-by-stream window #(.get_acked %) sum stats) + "failed" + (merge-stats-specific-field-by-stream window #(.get_failed %) sum stats) + "complete_ms_avg" + (merge-stats-specific-field-by-stream window #(.get_complete_ms_avg %) #(float-str (average %)) stats) + } + )) + +(defmethod merge-executor-specific-stats :bolt + [window specific-stats] + (let [stats (map #(.get_bolt %) specific-stats)] + {"acked" + (merge-stats-specific-field-by-stream window #(.get_acked %) sum stats) + "failed" + (merge-stats-specific-field-by-stream window #(.get_failed %) sum stats) + "process_ms_avg" + (merge-stats-specific-field-by-stream window #(.get_process_ms_avg %) #(float-str (average %)) stats) + "executed" + (merge-stats-specific-field-by-stream window #(.get_executed %) sum stats) + "executed_ms_avg" + (merge-stats-specific-field-by-stream window #(.get_execute_ms_avg %) #(float-str (average %)) stats) + } + )) + +(defn merge-executor-stats [window component-id eslist] + (let [stats (map #(.get_stats %) eslist) + specific-stats (map #(.get_specific %) stats)] + (merge {"id" component-id} + (merge-executor-common-stats window stats) + (merge-executor-specific-stats window specific-stats)))) + +(defn topology-metrics-page [id window include-sys?] + (thrift/with-configured-nimbus-connection nimbus + (let [window (if window window ":all-time") + window-hint (window-hint window) + topology (.getTopology ^Nimbus$Client nimbus id) + summ (->> (doto + (GetInfoOptions.) + (.set_num_err_choice NumErrorsChoice/NONE)) + (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id)) + execs (.get_executors summ) + spout-summs (filter (partial spout-summary? topology) execs) + bolt-summs (filter (partial bolt-summary? topology) execs) + spout-comp-summs (group-by-comp spout-summs) + bolt-comp-summs (group-by-comp bolt-summs) + bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?) + bolt-comp-summs) + merged-spout-stats (map (fn [[k v]] (merge-executor-stats window k v)) spout-comp-summs) + merged-bolt-stats (map (fn [[k v]] (merge-executor-stats window k v)) bolt-comp-summs)] + (merge {"window" window "window-hint" window-hint "spouts" merged-spout-stats "bolts" merged-bolt-stats})))) + (defn component-errors [errors-list topology-id secure?] (let [errors (->> errors-list @@ -965,6 +1058,11 @@ (assert-authorized-user "getTopology" (topology-config id)) (let [user (get-user-name servlet-request)] (json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user (= scheme :https)) (:callback m)))) + (GET "/api/v1/topology/:id/metrics" [:as {:keys [cookies servlet-request]} id & m] + (mark! ui:num-topology-metric-http-requests) + (populate-context! servlet-request) + (assert-authorized-user "getTopology" (topology-config id)) + (json-response (topology-metrics-page id (:window m) (check-include-sys? (:sys m))) (:callback m))) (GET "/api/v1/topology/:id/visualization-init" [:as {:keys [cookies servlet-request]} id & m] (mark! ui:num-build-visualization-http-requests) (populate-context! servlet-request)
