Repository: storm Updated Branches: refs/heads/master afcb2a065 -> fa25f3d7f
http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index 4b96620..bcf6e4f 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -21,13 +21,14 @@ ring.middleware.multipart-params) (:use [ring.middleware.json :only [wrap-json-params]]) (:use [hiccup core page-helpers]) - (:use [org.apache.storm config util log stats zookeeper converter]) + (:use [org.apache.storm config util log zookeeper converter]) (:use [org.apache.storm.ui helpers]) (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID ACKER-FAIL-STREAM-ID mk-authorization-handler start-metrics-reporters]]]) (:import [org.apache.storm.utils Time] - [org.apache.storm.generated NimbusSummary]) + [org.apache.storm.generated NimbusSummary] + [org.apache.storm.stats StatsUtil]) (:use [clojure.string :only [blank? lower-case trim split]]) (:import [org.apache.storm.generated ExecutorSpecificStats ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats @@ -109,7 +110,7 @@ (defn executor-summary-type [topology ^ExecutorSummary s] - (component-type topology (.get_component_id s))) + (StatsUtil/componentType topology (.get_component_id s))) (defn is-ack-stream [stream] @@ -119,6 +120,12 @@ ACKER-FAIL-STREAM-ID]] (every? #(not= %1 stream) acker-streams))) +(defn mk-include-sys-fn + [include-sys?] + (if include-sys? + (fn [_] true) + (fn [stream] (and (string? stream) (not (Utils/isSystemId stream)))))) + (defn spout-summary? [topology s] (= :spout (executor-summary-type topology s))) @@ -167,7 +174,7 @@ (defn get-error-data [error] (if error - (error-subset (.get_error ^ErrorInfo error)) + (StatsUtil/errorSubset (.get_error ^ErrorInfo error)) "")) (defn get-error-port @@ -234,23 +241,23 @@ bolt-summs (get bolt-comp-summs id) spout-summs (get spout-comp-summs id) bolt-cap (if bolt-summs - (compute-bolt-capacity bolt-summs) + (StatsUtil/computeBoltCapacity bolt-summs) 0)] {:type (if bolt-summs "bolt" "spout") :capacity bolt-cap :latency (if bolt-summs (get-in - (bolt-streams-stats bolt-summs true) + (clojurify-structure (StatsUtil/boltStreamsStats bolt-summs true)) [:process-latencies window]) (get-in - (spout-streams-stats spout-summs true) + (clojurify-structure (StatsUtil/spoutStreamsStats spout-summs true)) [:complete-latencies window])) :transferred (or (get-in - (spout-streams-stats spout-summs true) + (clojurify-structure (StatsUtil/spoutStreamsStats spout-summs true)) [:transferred window]) (get-in - (bolt-streams-stats bolt-summs true) + (clojurify-structure (StatsUtil/boltStreamsStats bolt-summs true)) [:transferred window])) :stats (let [mapfn (fn [dat] (map (fn [^ExecutorSummary summ] @@ -492,7 +499,7 @@ "window" w "emitted" (get-in stats [:emitted w]) "transferred" (get-in stats [:transferred w]) - "completeLatency" (float-str (get-in stats [:complete-latencies w])) + "completeLatency" (StatsUtil/floatStr (get-in stats [:complete-latencies w])) "acked" (get-in stats [:acked w]) "failed" (get-in stats [:failed w])}))) @@ -555,7 +562,7 @@ (get-error-json topo-id (.get_last_error s) secure?) {"spoutId" id "encodedSpoutId" (URLEncoder/encode id) - "completeLatency" (float-str (.get_complete_latency_ms ss))}))) + "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms ss))}))) (defmethod comp-agg-stats-json ComponentType/BOLT [topo-id secure? [id ^ComponentAggregateStats s]] @@ -566,10 +573,10 @@ (get-error-json topo-id (.get_last_error s) secure?) {"boltId" id "encodedBoltId" (URLEncoder/encode id) - "capacity" (float-str (.get_capacity ss)) - "executeLatency" (float-str (.get_execute_latency_ms ss)) + "capacity" (StatsUtil/floatStr (.get_capacity ss)) + "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms ss)) "executed" (.get_executed ss) - "processLatency" (float-str (.get_process_latency_ms ss))}))) + "processLatency" (StatsUtil/floatStr (.get_process_latency_ms ss))}))) (defn- unpack-topology-page-info "Unpacks the serialized object to data structures" @@ -679,10 +686,10 @@ "transferred" (.get_transferred comm-s) "acked" (.get_acked comm-s) "failed" (.get_failed comm-s) - "executeLatency" (float-str (.get_execute_latency_ms bolt-s)) - "processLatency" (float-str (.get_process_latency_ms bolt-s)) + "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bolt-s)) + "processLatency" (StatsUtil/floatStr (.get_process_latency_ms bolt-s)) "executed" (.get_executed bolt-s) - "capacity" (float-str (.get_capacity bolt-s))})) + "capacity" (StatsUtil/floatStr (.get_capacity bolt-s))})) (defmethod unpack-comp-agg-stat ComponentType/SPOUT [[window ^ComponentAggregateStats s]] @@ -695,7 +702,7 @@ "transferred" (.get_transferred comm-s) "acked" (.get_acked comm-s) "failed" (.get_failed comm-s) - "completeLatency" (float-str (.get_complete_latency_ms spout-s))})) + "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms spout-s))})) (defn- unpack-bolt-input-stat [[^GlobalStreamId s ^ComponentAggregateStats stats]] @@ -706,8 +713,8 @@ {"component" comp-id "encodedComponentId" (URLEncoder/encode comp-id) "stream" (.get_streamId s) - "executeLatency" (float-str (.get_execute_latency_ms bas)) - "processLatency" (float-str (.get_process_latency_ms bas)) + "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas)) + "processLatency" (StatsUtil/floatStr (.get_process_latency_ms bas)) "executed" (Utils/nullToZero (.get_executed bas)) "acked" (Utils/nullToZero (.get_acked cas)) "failed" (Utils/nullToZero (.get_failed cas))})) @@ -730,7 +737,7 @@ {"stream" stream-id "emitted" (Utils/nullToZero (.get_emitted cas)) "transferred" (Utils/nullToZero (.get_transferred cas)) - "completeLatency" (float-str (.get_complete_latency_ms spout-s)) + "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms spout-s)) "acked" (Utils/nullToZero (.get_acked cas)) "failed" (Utils/nullToZero (.get_failed cas))})) @@ -757,10 +764,10 @@ "port" port "emitted" (Utils/nullToZero (.get_emitted cas)) "transferred" (Utils/nullToZero (.get_transferred cas)) - "capacity" (float-str (Utils/nullToZero (.get_capacity bas))) - "executeLatency" (float-str (.get_execute_latency_ms bas)) + "capacity" (StatsUtil/floatStr (Utils/nullToZero (.get_capacity bas))) + "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas)) "executed" (Utils/nullToZero (.get_executed bas)) - "processLatency" (float-str (.get_process_latency_ms bas)) + "processLatency" (StatsUtil/floatStr (.get_process_latency_ms bas)) "acked" (Utils/nullToZero (.get_acked cas)) "failed" (Utils/nullToZero (.get_failed cas)) "workerLogLink" (worker-log-link host port topology-id secure?)})) @@ -785,7 +792,7 @@ "port" port "emitted" (Utils/nullToZero (.get_emitted cas)) "transferred" (Utils/nullToZero (.get_transferred cas)) - "completeLatency" (float-str (.get_complete_latency_ms sas)) + "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms sas)) "acked" (Utils/nullToZero (.get_acked cas)) "failed" (Utils/nullToZero (.get_failed cas)) "workerLogLink" (worker-log-link host port topology-id secure?)})) http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index ce58f42..a76db54 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -15,14 +15,15 @@ ;; limitations under the License. (ns org.apache.storm.nimbus-test (:use [clojure test]) - (:require [org.apache.storm [util :as util] [stats :as stats]]) + (:require [org.apache.storm [util :as util]]) (:require [org.apache.storm.daemon [nimbus :as nimbus]]) (:require [org.apache.storm [converter :as converter]]) (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout TestPlannerBolt] [org.apache.storm.nimbus InMemoryTopologyActionNotifier] [org.apache.storm.generated GlobalStreamId] - [org.apache.storm Thrift]) + [org.apache.storm Thrift] + [org.apache.storm.stats StatsUtil]) (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) (:import [org.apache.storm.scheduler INimbus]) (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo]) @@ -139,7 +140,8 @@ curr-beat (.get-worker-heartbeat state storm-id node port) stats (:executor-stats curr-beat)] (.worker-heartbeat! state storm-id node port - {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})} + {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 + :executor-stats (merge stats {executor (clojurify-structure (StatsUtil/renderStats (StatsUtil/mkBoltStats 20)))})} ))) (defn slot-assignments [cluster storm-id]
