1. changed heartbeat structure to java HashMap 2. use HashMaps in StatsUtil instead of clojure map 3. changed tests accordingly
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c246d1c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c246d1c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c246d1c Branch: refs/heads/master Commit: 4c246d1c5582396debfad2a3687a243303e9a0e5 Parents: 9002528 Author: å«ä¹ <[email protected]> Authored: Tue Mar 8 20:28:14 2016 +0800 Committer: å«ä¹ <[email protected]> Committed: Tue Mar 8 20:28:14 2016 +0800 ---------------------------------------------------------------------- .../clj/org/apache/storm/command/heartbeats.clj | 5 +- .../src/clj/org/apache/storm/converter.clj | 25 - .../clj/org/apache/storm/daemon/executor.clj | 2 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 56 +- .../src/clj/org/apache/storm/daemon/worker.clj | 18 +- storm-core/src/clj/org/apache/storm/testing.clj | 8 +- storm-core/src/clj/org/apache/storm/ui/core.clj | 16 +- .../apache/storm/stats/BoltExecutorStats.java | 47 +- .../apache/storm/stats/SpoutExecutorStats.java | 37 +- .../jvm/org/apache/storm/stats/StatsUtil.java | 1281 +++++++++++------- .../test/clj/org/apache/storm/nimbus_test.clj | 17 +- 11 files changed, 835 insertions(+), 677 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/command/heartbeats.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj index c4413f0..625cff7 100644 --- a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj +++ b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj @@ -22,7 +22,8 @@ [clojure.string :as string]) (:import [org.apache.storm.generated ClusterWorkerHeartbeat] [org.apache.storm.utils Utils ConfigUtils] - [org.apache.storm.cluster ZKStateStorage ClusterStateContext ClusterUtils]) + [org.apache.storm.cluster ZKStateStorage ClusterStateContext ClusterUtils] + [org.apache.storm.stats StatsUtil]) (:gen-class)) (defn -main [command path & args] @@ -37,7 +38,7 @@ "get" (log-message (if-let [hb (.get_worker_hb cluster path false)] - (clojurify-zk-worker-hb + (StatsUtil/convertZkWorkerHb (Utils/deserialize hb ClusterWorkerHeartbeat)) http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/converter.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj index 495fe7f..6bd7e72 100644 --- a/storm-core/src/clj/org/apache/storm/converter.clj +++ b/storm-core/src/clj/org/apache/storm/converter.clj @@ -215,31 +215,6 @@ (convert-to-symbol-from-status (.get_prev_status storm-base)) (map-val clojurify-debugoptions (.get_component_debug storm-base))))) -(defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb] - (if worker-hb - {:storm-id (.get_storm_id worker-hb) - :executor-stats (clojurify-structure (StatsUtil/clojurifyStats (into {} (.get_executor_stats worker-hb)))) - :uptime (.get_uptime_secs worker-hb) - :time-secs (.get_time_secs worker-hb) - } - {})) - -(defn clojurify-zk-executor-hb [^ExecutorBeat executor-hb] - (if executor-hb - {:stats (StatsUtil/clojurifyExecutorStats (.getStats executor-hb)) - :uptime (.getUptime executor-hb) - :time-secs (.getTimeSecs executor-hb) - } - {})) - -(defn thriftify-zk-worker-hb [worker-hb] - (if (not-empty (filter second (:executor-stats worker-hb))) - (doto (ClusterWorkerHeartbeat.) - (.set_uptime_secs (:uptime worker-hb)) - (.set_storm_id (:storm-id worker-hb)) - (.set_executor_stats (StatsUtil/thriftifyStats (filter second (:executor-stats worker-hb)))) - (.set_time_secs (:time-secs worker-hb))))) - (defn thriftify-error [error] (doto (ErrorInfo. (:error error) (:time-secs error)) (.set_host (:host error)) http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 4bbce10..becd8f3 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -406,7 +406,7 @@ (reify RunningExecutor (render-stats [this] - (clojurify-structure (.renderStats (:stats executor-data)))) + (.renderStats (:stats executor-data))) (get-executor-id [this] executor-id) (credentials-changed [this creds] http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 83f73d5..997f92c 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -559,48 +559,17 @@ executor->component (:launch-time-secs storm-base)))) -;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that -;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and -;; tracked through heartbeat-cache -(defn- update-executor-cache [curr hb timeout] - (let [reported-time (:time-secs hb) - {last-nimbus-time :nimbus-time - last-reported-time :executor-reported-time} curr - reported-time (cond reported-time reported-time - last-reported-time last-reported-time - :else 0) - nimbus-time (if (or (not last-nimbus-time) - (not= last-reported-time reported-time)) - (Time/currentTimeSecs) - last-nimbus-time - )] - {:is-timed-out (and - nimbus-time - (>= (Time/deltaSecs nimbus-time) timeout)) - :nimbus-time nimbus-time - :executor-reported-time reported-time - :heartbeat hb})) - -(defn update-heartbeat-cache [cache executor-beats all-executors timeout] - (let [cache (select-keys cache all-executors)] - (into {} - (for [executor all-executors :let [curr (cache executor)]] - [executor - (update-executor-cache curr (get executor-beats executor) timeout)] - )))) (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment] (log-debug "Updating heartbeats for " storm-id " " (pr-str all-executors)) (let [storm-cluster-state (:storm-cluster-state nimbus) - executor-beats (let [executor-stats-java-map (.executorBeats storm-cluster-state storm-id (.get_executor_node_port (thriftify-assignment existing-assignment))) - executor-stats-clojurify (clojurify-structure executor-stats-java-map)] - (->> (dofor [[^ExecutorInfo executor-info ^ExecutorBeat executor-heartbeat] executor-stats-clojurify] - {[(.get_task_start executor-info) (.get_task_end executor-info)] (clojurify-zk-executor-hb executor-heartbeat)}) - (apply merge))) - cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id) + executor-beats (let [executor-stats-java-map (.executorBeats storm-cluster-state storm-id + (.get_executor_node_port (thriftify-assignment existing-assignment)))] + (StatsUtil/convertExecutorBeats executor-stats-java-map)) + cache (StatsUtil/updateHeartbeatCache (@(:heartbeats-cache nimbus) storm-id) executor-beats - all-executors - ((:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS))] + (StatsUtil/convertExecutors all-executors) + (int ((:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS)))] (swap! (:heartbeats-cache nimbus) assoc storm-id cache))) (defn- update-all-heartbeats! [nimbus existing-assignments topology->executors] @@ -625,7 +594,7 @@ (->> all-executors (filter (fn [executor] (let [start-time (get executor-start-times executor) - is-timed-out (-> heartbeats-cache (get executor) :is-timed-out)] + is-timed-out (.get (.get heartbeats-cache (StatsUtil/convertExecutor executor)) "is-timed-out")] (if (and start-time (or (< (Time/deltaSecs start-time) @@ -1415,8 +1384,7 @@ (throw (NotAliveException. (str storm-id)))) assignment (clojurify-assignment (.assignmentInfo storm-cluster-state storm-id nil)) - beats (map-val :heartbeat (get @(:heartbeats-cache nimbus) - storm-id)) + beats (get @(:heartbeats-cache nimbus) storm-id) all-components (set (vals task->component))] {:storm-name storm-name :storm-cluster-state storm-cluster-state @@ -1919,9 +1887,9 @@ (map (fn [c] [c (errors-fn storm-cluster-state storm-id c)])) (into {})) executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)] - (let [host (-> assignment :node->host (get node)) - heartbeat (get beats executor) - excutorstats (:stats heartbeat) + (let [host (-> assignment :node->host (get node)) + heartbeat (.get beats (StatsUtil/convertExecutor executor)) + excutorstats (.get (.get heartbeat "heartbeat") "stats") excutorstats (if excutorstats (StatsUtil/thriftifyExecutorStats excutorstats))] @@ -1930,7 +1898,7 @@ (-> executor first task->component) host port - (Utils/nullToZero (:uptime heartbeat))) + (Utils/nullToZero (.get heartbeat "uptime"))) (.set_stats excutorstats)) )) topo-info (TopologyInfo. storm-id http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 92ba807..10a1e47 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -21,7 +21,8 @@ (:require [org.apache.storm.daemon [executor :as executor]]) (:require [clojure.set :as set]) - (:import [java.io File]) + (:import [java.io File] + [org.apache.storm.stats StatsUtil]) (:import [java.util.concurrent Executors] [org.apache.storm.hooks IWorkerHook BaseWorkerHook] [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J]) @@ -66,18 +67,15 @@ (defnk do-executor-heartbeats [worker :executors nil] ;; stats is how we know what executors are assigned to this worker (let [stats (if-not executors - (into {} (map (fn [e] {e nil}) (:executors worker))) - (->> executors + (StatsUtil/mkEmptyExecutorZkHbs (:executors worker)) + (StatsUtil/convertExecutorZkHbs (->> executors (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)})) - (apply merge))) - zk-hb {:storm-id (:storm-id worker) - :executor-stats stats - :uptime (. (:uptime worker) upTime) - :time-secs (Time/currentTimeSecs) - }] + (apply merge)))) + zk-hb (StatsUtil/mkZkWorkerHb (:storm-id worker) stats (. (:uptime worker) upTime))] ;; do the zookeeper heartbeat (try - (.workerHeartbeat (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (long (:port worker)) (thriftify-zk-worker-hb zk-hb)) + (.workerHeartbeat (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (long (:port worker)) + (StatsUtil/thriftifyZkWorkerHb zk-hb)) (catch Exception exc (log-error exc "Worker failed to write heatbeats to ZK or Pacemaker...will retry"))))) http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index 66fc051..419cf2b 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -452,7 +452,7 @@ assignment (clojurify-assignment (.assignmentInfo state storm-id nil)) taskbeats (.taskbeats state storm-id (:task->node+port assignment)) heartbeats (dofor [id task-ids] (get taskbeats id)) - stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))] + stats (dofor [hb heartbeats] (if hb (.get (.get hb "stats") stat-key) 0))] (reduce + stats))) (defn emitted-spout-tuples @@ -460,16 +460,16 @@ (aggregated-stat cluster-map storm-name - :emitted + "emitted" :component-ids (keys (.get_spouts topology)))) (defn transferred-tuples [cluster-map storm-name] - (aggregated-stat cluster-map storm-name :transferred)) + (aggregated-stat cluster-map storm-name "transferred")) (defn acked-tuples [cluster-map storm-name] - (aggregated-stat cluster-map storm-name :acked)) + (aggregated-stat cluster-map storm-name "acked")) (defn simulate-wait [cluster-map] http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index b9cf2d7..0730d96 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -124,11 +124,11 @@ (defn spout-summary? [topology s] - (= :spout (executor-summary-type topology s))) + (= "spout" (executor-summary-type topology s))) (defn bolt-summary? [topology s] - (= :bolt (executor-summary-type topology s))) + (= "bolt" (executor-summary-type topology s))) (defn group-by-comp [summs] @@ -230,8 +230,8 @@ (let [components (for [[id spec] spout-bolt] [id (let [inputs (.get_inputs (.get_common spec)) - bolt-summs (get bolt-comp-summs id) - spout-summs (get spout-comp-summs id) + bolt-summs (.get bolt-comp-summs id) + spout-summs (.get spout-comp-summs id) bolt-cap (if bolt-summs (StatsUtil/computeBoltCapacity bolt-summs) 0)] @@ -240,17 +240,17 @@ :latency (if bolt-summs (get-in (clojurify-structure (StatsUtil/boltStreamsStats bolt-summs true)) - [:process-latencies window]) + ["process-latencies" window]) (get-in (clojurify-structure (StatsUtil/spoutStreamsStats spout-summs true)) - [:complete-latencies window])) + ["complete-latencies" window])) :transferred (or (get-in (clojurify-structure (StatsUtil/spoutStreamsStats spout-summs true)) - [:transferred window]) + ["transferred" window]) (get-in (clojurify-structure (StatsUtil/boltStreamsStats bolt-summs true)) - [:transferred window])) + ["transferred" window])) :stats (let [mapfn (fn [dat] (map (fn [^ExecutorSummary summ] {:host (.get_host summ) http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java index d8c7f06..e26e56b 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java @@ -18,9 +18,10 @@ package org.apache.storm.stats; import com.google.common.collect.Lists; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import org.apache.storm.generated.BoltStats; +import org.apache.storm.generated.ExecutorSpecificStats; +import org.apache.storm.generated.ExecutorStats; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; @@ -33,8 +34,6 @@ public class BoltExecutorStats extends CommonStats { public static final String PROCESS_LATENCIES = "process-latencies"; public static final String EXECUTE_LATENCIES = "execute-latencies"; - public static final String[] BOLT_FIELDS = {ACKED, FAILED, EXECUTED, PROCESS_LATENCIES, EXECUTE_LATENCIES}; - public BoltExecutorStats(int rate) { super(rate); @@ -83,32 +82,24 @@ public class BoltExecutorStats extends CommonStats { } - public Map renderStats() { + public ExecutorStats renderStats() { cleanupStats(); - Map ret = new HashMap(); - ret.putAll(valueStats(CommonStats.COMMON_FIELDS)); - ret.putAll(valueStats(BoltExecutorStats.BOLT_FIELDS)); - StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT); + + ExecutorStats ret = new ExecutorStats(); + // common stats + ret.set_emitted(valueStat(EMITTED)); + ret.set_transferred(valueStat(TRANSFERRED)); + ret.set_rate(this.rate); + + // bolt stats + BoltStats boltStats = new BoltStats( + StatsUtil.windowSetConverter(valueStat(ACKED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(FAILED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(EXECUTED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY)); + ret.set_specific(ExecutorSpecificStats.bolt(boltStats)); return ret; } - -// public ExecutorStats renderStats() { -// cleanupStats(); -// -// ExecutorStats ret = new ExecutorStats(); -// ret.set_emitted(valueStat(EMITTED)); -// ret.set_transferred(valueStat(TRANSFERRED)); -// ret.set_rate(this.rate); -// -// BoltStats boltStats = new BoltStats( -// StatsUtil.windowSetConverter(valueStat(ACKED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), -// StatsUtil.windowSetConverter(valueStat(FAILED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), -// StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY), -// StatsUtil.windowSetConverter(valueStat(EXECUTED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), -// StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY)); -// ret.set_specific(ExecutorSpecificStats.bolt(boltStats)); -// -// return ret; -// } } http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java index 27c626e..3c09a38 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java @@ -19,6 +19,9 @@ package org.apache.storm.stats; import java.util.HashMap; import java.util.Map; +import org.apache.storm.generated.ExecutorSpecificStats; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.SpoutStats; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; @@ -29,8 +32,6 @@ public class SpoutExecutorStats extends CommonStats { public static final String FAILED = "failed"; public static final String COMPLETE_LATENCIES = "complete-latencies"; - public static final String[] SPOUT_FIELDS = {ACKED, FAILED, COMPLETE_LATENCIES}; - public SpoutExecutorStats(int rate) { super(rate); this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); @@ -59,28 +60,20 @@ public class SpoutExecutorStats extends CommonStats { this.getFailed().incBy(stream, this.rate); } - public Map renderStats() { + public ExecutorStats renderStats() { cleanupStats(); - Map ret = new HashMap(); - ret.putAll(valueStats(CommonStats.COMMON_FIELDS)); - ret.putAll(valueStats(SpoutExecutorStats.SPOUT_FIELDS)); - StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT); + + ExecutorStats ret = new ExecutorStats(); + // common fields + ret.set_emitted(valueStat(EMITTED)); + ret.set_transferred(valueStat(TRANSFERRED)); + ret.set_rate(this.rate); + + // spout stats + SpoutStats spoutStats = new SpoutStats( + valueStat(ACKED), valueStat(FAILED), valueStat(COMPLETE_LATENCIES)); + ret.set_specific(ExecutorSpecificStats.spout(spoutStats)); return ret; } - -// public ExecutorStats renderStats() { -// cleanupStats(); -// -// ExecutorStats ret = new ExecutorStats(); -// ret.set_emitted(valueStat(EMITTED)); -// ret.set_transferred(valueStat(TRANSFERRED)); -// ret.set_rate(this.rate); -// -// SpoutStats spoutStats = new SpoutStats( -// valueStat(ACKED), valueStat(FAILED), valueStat(COMPLETE_LATENCIES)); -// ret.set_specific(ExecutorSpecificStats.spout(spoutStats)); -// -// return ret; -// } }
