1. changed heartbeat structure to java HashMap
2. use HashMaps in StatsUtil instead of clojure map
3. changed tests accordingly


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4c246d1c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4c246d1c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4c246d1c

Branch: refs/heads/master
Commit: 4c246d1c5582396debfad2a3687a243303e9a0e5
Parents: 9002528
Author: 卫乐 <[email protected]>
Authored: Tue Mar 8 20:28:14 2016 +0800
Committer: 卫乐 <[email protected]>
Committed: Tue Mar 8 20:28:14 2016 +0800

----------------------------------------------------------------------
 .../clj/org/apache/storm/command/heartbeats.clj |    5 +-
 .../src/clj/org/apache/storm/converter.clj      |   25 -
 .../clj/org/apache/storm/daemon/executor.clj    |    2 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   56 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |   18 +-
 storm-core/src/clj/org/apache/storm/testing.clj |    8 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |   16 +-
 .../apache/storm/stats/BoltExecutorStats.java   |   47 +-
 .../apache/storm/stats/SpoutExecutorStats.java  |   37 +-
 .../jvm/org/apache/storm/stats/StatsUtil.java   | 1281 +++++++++++-------
 .../test/clj/org/apache/storm/nimbus_test.clj   |   17 +-
 11 files changed, 835 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj 
b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
index c4413f0..625cff7 100644
--- a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
+++ b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
@@ -22,7 +22,8 @@
             [clojure.string :as string])
   (:import [org.apache.storm.generated ClusterWorkerHeartbeat]
            [org.apache.storm.utils Utils ConfigUtils]
-           [org.apache.storm.cluster ZKStateStorage ClusterStateContext 
ClusterUtils])
+           [org.apache.storm.cluster ZKStateStorage ClusterStateContext 
ClusterUtils]
+           [org.apache.storm.stats StatsUtil])
   (:gen-class))
 
 (defn -main [command path & args]
@@ -37,7 +38,7 @@
       "get"
       (log-message 
        (if-let [hb (.get_worker_hb cluster path false)]
-         (clojurify-zk-worker-hb
+         (StatsUtil/convertZkWorkerHb
           (Utils/deserialize
            hb
            ClusterWorkerHeartbeat))

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj 
b/storm-core/src/clj/org/apache/storm/converter.clj
index 495fe7f..6bd7e72 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -215,31 +215,6 @@
       (convert-to-symbol-from-status (.get_prev_status storm-base))
       (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
 
-(defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb]
-  (if worker-hb
-    {:storm-id (.get_storm_id worker-hb)
-     :executor-stats (clojurify-structure (StatsUtil/clojurifyStats (into {} 
(.get_executor_stats worker-hb))))
-     :uptime (.get_uptime_secs worker-hb)
-     :time-secs (.get_time_secs worker-hb)
-     }
-    {}))
-
-(defn clojurify-zk-executor-hb [^ExecutorBeat executor-hb]
-  (if executor-hb
-    {:stats (StatsUtil/clojurifyExecutorStats (.getStats executor-hb))
-     :uptime (.getUptime executor-hb)
-     :time-secs (.getTimeSecs executor-hb)
-     }
-    {}))
-
-(defn thriftify-zk-worker-hb [worker-hb]
-  (if (not-empty (filter second (:executor-stats worker-hb)))
-    (doto (ClusterWorkerHeartbeat.)
-      (.set_uptime_secs (:uptime worker-hb))
-      (.set_storm_id (:storm-id worker-hb))
-      (.set_executor_stats (StatsUtil/thriftifyStats (filter second 
(:executor-stats worker-hb))))
-      (.set_time_secs (:time-secs worker-hb)))))
-
 (defn thriftify-error [error]
   (doto (ErrorInfo. (:error error) (:time-secs error))
     (.set_host (:host error))

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 4bbce10..becd8f3 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -406,7 +406,7 @@
     (reify
       RunningExecutor
       (render-stats [this]
-        (clojurify-structure (.renderStats (:stats executor-data))))
+        (.renderStats (:stats executor-data)))
       (get-executor-id [this]
         executor-id)
       (credentials-changed [this creds]

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj 
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 83f73d5..997f92c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -559,48 +559,17 @@
                       executor->component
                       (:launch-time-secs storm-base))))
 
-;; Does not assume that clocks are synchronized. Executor heartbeat is only 
used so that
-;; nimbus knows when it's received a new heartbeat. All timing is done by 
nimbus and
-;; tracked through heartbeat-cache
-(defn- update-executor-cache [curr hb timeout]
-  (let [reported-time (:time-secs hb)
-        {last-nimbus-time :nimbus-time
-         last-reported-time :executor-reported-time} curr
-        reported-time (cond reported-time reported-time
-                            last-reported-time last-reported-time
-                            :else 0)
-        nimbus-time (if (or (not last-nimbus-time)
-                        (not= last-reported-time reported-time))
-                      (Time/currentTimeSecs)
-                      last-nimbus-time
-                      )]
-      {:is-timed-out (and
-                       nimbus-time
-                       (>= (Time/deltaSecs nimbus-time) timeout))
-       :nimbus-time nimbus-time
-       :executor-reported-time reported-time
-       :heartbeat hb}))
-
-(defn update-heartbeat-cache [cache executor-beats all-executors timeout]
-  (let [cache (select-keys cache all-executors)]
-    (into {}
-      (for [executor all-executors :let [curr (cache executor)]]
-        [executor
-         (update-executor-cache curr (get executor-beats executor) timeout)]
-         ))))
 
 (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment]
   (log-debug "Updating heartbeats for " storm-id " " (pr-str all-executors))
   (let [storm-cluster-state (:storm-cluster-state nimbus)
-        executor-beats (let [executor-stats-java-map (.executorBeats 
storm-cluster-state storm-id (.get_executor_node_port (thriftify-assignment 
existing-assignment)))
-                             executor-stats-clojurify (clojurify-structure 
executor-stats-java-map)]
-                         (->> (dofor [[^ExecutorInfo executor-info  
^ExecutorBeat executor-heartbeat] executor-stats-clojurify]
-                             {[(.get_task_start executor-info) (.get_task_end 
executor-info)] (clojurify-zk-executor-hb executor-heartbeat)})
-                           (apply merge)))
-        cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)
+        executor-beats (let [executor-stats-java-map (.executorBeats 
storm-cluster-state storm-id
+                                                       
(.get_executor_node_port (thriftify-assignment existing-assignment)))]
+                         (StatsUtil/convertExecutorBeats 
executor-stats-java-map))
+        cache (StatsUtil/updateHeartbeatCache (@(:heartbeats-cache nimbus) 
storm-id)
                                       executor-beats
-                                      all-executors
-                                      ((:conf nimbus) 
NIMBUS-TASK-TIMEOUT-SECS))]
+                                      (StatsUtil/convertExecutors 
all-executors)
+                                      (int ((:conf nimbus) 
NIMBUS-TASK-TIMEOUT-SECS)))]
       (swap! (:heartbeats-cache nimbus) assoc storm-id cache)))
 
 (defn- update-all-heartbeats! [nimbus existing-assignments topology->executors]
@@ -625,7 +594,7 @@
     (->> all-executors
         (filter (fn [executor]
           (let [start-time (get executor-start-times executor)
-                is-timed-out (-> heartbeats-cache (get executor) 
:is-timed-out)]
+                is-timed-out (.get (.get heartbeats-cache 
(StatsUtil/convertExecutor executor)) "is-timed-out")]
             (if (and start-time
                    (or
                     (< (Time/deltaSecs start-time)
@@ -1415,8 +1384,7 @@
                                      (throw
                                        (NotAliveException. (str storm-id))))
                   assignment (clojurify-assignment (.assignmentInfo 
storm-cluster-state storm-id nil))
-                  beats (map-val :heartbeat (get @(:heartbeats-cache nimbus)
-                                                 storm-id))
+                  beats (get @(:heartbeats-cache nimbus) storm-id)
                   all-components (set (vals task->component))]
               {:storm-name storm-name
                :storm-cluster-state storm-cluster-state
@@ -1919,9 +1887,9 @@
                           (map (fn [c] [c (errors-fn storm-cluster-state 
storm-id c)]))
                           (into {}))
               executor-summaries (dofor [[executor [node port]] 
(:executor->node+port assignment)]
-                                        (let [host (-> assignment :node->host 
(get node))
-                                              heartbeat (get beats executor)
-                                              excutorstats (:stats heartbeat)
+                                     (let [host (-> assignment :node->host 
(get node))
+                                              heartbeat (.get beats 
(StatsUtil/convertExecutor executor))
+                                              excutorstats (.get (.get 
heartbeat "heartbeat") "stats")
                                               excutorstats (if excutorstats
                                                       
(StatsUtil/thriftifyExecutorStats excutorstats))]
                                               
@@ -1930,7 +1898,7 @@
                                                                 (-> executor 
first task->component)
                                                                 host
                                                                 port
-                                                                
(Utils/nullToZero (:uptime heartbeat)))
+                                                                
(Utils/nullToZero (.get heartbeat "uptime")))
                                             (.set_stats excutorstats))
                                           ))
               topo-info  (TopologyInfo. storm-id

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj 
b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 92ba807..10a1e47 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -21,7 +21,8 @@
   (:require [org.apache.storm.daemon [executor :as executor]])
 
   (:require [clojure.set :as set])
-  (:import [java.io File])
+  (:import [java.io File]
+           [org.apache.storm.stats StatsUtil])
   (:import [java.util.concurrent Executors]
            [org.apache.storm.hooks IWorkerHook BaseWorkerHook]
            [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
@@ -66,18 +67,15 @@
 (defnk do-executor-heartbeats [worker :executors nil]
   ;; stats is how we know what executors are assigned to this worker 
   (let [stats (if-not executors
-                  (into {} (map (fn [e] {e nil}) (:executors worker)))
-                  (->> executors
+                  (StatsUtil/mkEmptyExecutorZkHbs (:executors worker))
+                  (StatsUtil/convertExecutorZkHbs (->> executors
                     (map (fn [e] {(executor/get-executor-id e) 
(executor/render-stats e)}))
-                    (apply merge)))
-        zk-hb {:storm-id (:storm-id worker)
-               :executor-stats stats
-               :uptime (. (:uptime worker) upTime)
-               :time-secs (Time/currentTimeSecs)
-               }]
+                    (apply merge))))
+        zk-hb (StatsUtil/mkZkWorkerHb (:storm-id worker) stats (. (:uptime 
worker) upTime))]
     ;; do the zookeeper heartbeat
     (try
-      (.workerHeartbeat (:storm-cluster-state worker) (:storm-id worker) 
(:assignment-id worker) (long (:port worker)) (thriftify-zk-worker-hb zk-hb))
+      (.workerHeartbeat (:storm-cluster-state worker) (:storm-id worker) 
(:assignment-id worker) (long (:port worker))
+        (StatsUtil/thriftifyZkWorkerHb zk-hb))
       (catch Exception exc
         (log-error exc "Worker failed to write heatbeats to ZK or 
Pacemaker...will retry")))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj 
b/storm-core/src/clj/org/apache/storm/testing.clj
index 66fc051..419cf2b 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -452,7 +452,7 @@
         assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
         taskbeats (.taskbeats state storm-id (:task->node+port assignment))
         heartbeats (dofor [id task-ids] (get taskbeats id))
-        stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))]
+        stats (dofor [hb heartbeats] (if hb (.get (.get hb "stats") stat-key) 
0))]
     (reduce + stats)))
 
 (defn emitted-spout-tuples
@@ -460,16 +460,16 @@
   (aggregated-stat
     cluster-map
     storm-name
-    :emitted
+    "emitted"
     :component-ids (keys (.get_spouts topology))))
 
 (defn transferred-tuples
   [cluster-map storm-name]
-  (aggregated-stat cluster-map storm-name :transferred))
+  (aggregated-stat cluster-map storm-name "transferred"))
 
 (defn acked-tuples
   [cluster-map storm-name]
-  (aggregated-stat cluster-map storm-name :acked))
+  (aggregated-stat cluster-map storm-name "acked"))
 
 (defn simulate-wait
   [cluster-map]

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj 
b/storm-core/src/clj/org/apache/storm/ui/core.clj
index b9cf2d7..0730d96 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -124,11 +124,11 @@
 
 (defn spout-summary?
   [topology s]
-  (= :spout (executor-summary-type topology s)))
+  (= "spout" (executor-summary-type topology s)))
 
 (defn bolt-summary?
   [topology s]
-  (= :bolt (executor-summary-type topology s)))
+  (= "bolt" (executor-summary-type topology s)))
 
 (defn group-by-comp
   [summs]
@@ -230,8 +230,8 @@
   (let [components (for [[id spec] spout-bolt]
             [id
              (let [inputs (.get_inputs (.get_common spec))
-                   bolt-summs (get bolt-comp-summs id)
-                   spout-summs (get spout-comp-summs id)
+                   bolt-summs (.get bolt-comp-summs id)
+                   spout-summs (.get spout-comp-summs id)
                    bolt-cap (if bolt-summs
                               (StatsUtil/computeBoltCapacity bolt-summs)
                               0)]
@@ -240,17 +240,17 @@
                 :latency (if bolt-summs
                            (get-in
                              (clojurify-structure (StatsUtil/boltStreamsStats 
bolt-summs true))
-                             [:process-latencies window])
+                             ["process-latencies" window])
                            (get-in
                              (clojurify-structure (StatsUtil/spoutStreamsStats 
spout-summs true))
-                             [:complete-latencies window]))
+                             ["complete-latencies" window]))
                 :transferred (or
                                (get-in
                                  (clojurify-structure 
(StatsUtil/spoutStreamsStats spout-summs true))
-                                 [:transferred window])
+                                 ["transferred" window])
                                (get-in
                                  (clojurify-structure 
(StatsUtil/boltStreamsStats bolt-summs true))
-                                 [:transferred window]))
+                                 ["transferred" window]))
                 :stats (let [mapfn (fn [dat]
                                      (map (fn [^ExecutorSummary summ]
                                             {:host (.get_host summ)

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java 
b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index d8c7f06..e26e56b 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -18,9 +18,10 @@
 package org.apache.storm.stats;
 
 import com.google.common.collect.Lists;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import org.apache.storm.generated.BoltStats;
+import org.apache.storm.generated.ExecutorSpecificStats;
+import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
@@ -33,8 +34,6 @@ public class BoltExecutorStats extends CommonStats {
     public static final String PROCESS_LATENCIES = "process-latencies";
     public static final String EXECUTE_LATENCIES = "execute-latencies";
 
-    public static final String[] BOLT_FIELDS = {ACKED, FAILED, EXECUTED, 
PROCESS_LATENCIES, EXECUTE_LATENCIES};
-
     public BoltExecutorStats(int rate) {
         super(rate);
 
@@ -83,32 +82,24 @@ public class BoltExecutorStats extends CommonStats {
 
     }
 
-    public Map renderStats() {
+    public ExecutorStats renderStats() {
         cleanupStats();
-        Map ret = new HashMap();
-        ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
-        ret.putAll(valueStats(BoltExecutorStats.BOLT_FIELDS));
-        StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT);
+
+        ExecutorStats ret = new ExecutorStats();
+        // common stats
+        ret.set_emitted(valueStat(EMITTED));
+        ret.set_transferred(valueStat(TRANSFERRED));
+        ret.set_rate(this.rate);
+
+        // bolt stats
+        BoltStats boltStats = new BoltStats(
+                StatsUtil.windowSetConverter(valueStat(ACKED), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(FAILED), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(EXECUTED), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+                StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY));
+        ret.set_specific(ExecutorSpecificStats.bolt(boltStats));
 
         return ret;
     }
-
-//    public ExecutorStats renderStats() {
-//        cleanupStats();
-//
-//        ExecutorStats ret = new ExecutorStats();
-//        ret.set_emitted(valueStat(EMITTED));
-//        ret.set_transferred(valueStat(TRANSFERRED));
-//        ret.set_rate(this.rate);
-//
-//        BoltStats boltStats = new BoltStats(
-//                StatsUtil.windowSetConverter(valueStat(ACKED), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-//                StatsUtil.windowSetConverter(valueStat(FAILED), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-//                StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-//                StatsUtil.windowSetConverter(valueStat(EXECUTED), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-//                StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), 
StatsUtil.TO_GSID, StatsUtil.IDENTITY));
-//        ret.set_specific(ExecutorSpecificStats.bolt(boltStats));
-//
-//        return ret;
-//    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java 
b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index 27c626e..3c09a38 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@ -19,6 +19,9 @@ package org.apache.storm.stats;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.storm.generated.ExecutorSpecificStats;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.SpoutStats;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
@@ -29,8 +32,6 @@ public class SpoutExecutorStats extends CommonStats {
     public static final String FAILED = "failed";
     public static final String COMPLETE_LATENCIES = "complete-latencies";
 
-    public static final String[] SPOUT_FIELDS = {ACKED, FAILED, 
COMPLETE_LATENCIES};
-
     public SpoutExecutorStats(int rate) {
         super(rate);
         this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
@@ -59,28 +60,20 @@ public class SpoutExecutorStats extends CommonStats {
         this.getFailed().incBy(stream, this.rate);
     }
 
-    public Map renderStats() {
+    public ExecutorStats renderStats() {
         cleanupStats();
-        Map ret = new HashMap();
-        ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
-        ret.putAll(valueStats(SpoutExecutorStats.SPOUT_FIELDS));
-        StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT);
+
+        ExecutorStats ret = new ExecutorStats();
+        // common fields
+        ret.set_emitted(valueStat(EMITTED));
+        ret.set_transferred(valueStat(TRANSFERRED));
+        ret.set_rate(this.rate);
+
+        // spout stats
+        SpoutStats spoutStats = new SpoutStats(
+                valueStat(ACKED), valueStat(FAILED), 
valueStat(COMPLETE_LATENCIES));
+        ret.set_specific(ExecutorSpecificStats.spout(spoutStats));
 
         return ret;
     }
-
-//    public ExecutorStats renderStats() {
-//        cleanupStats();
-//
-//        ExecutorStats ret = new ExecutorStats();
-//        ret.set_emitted(valueStat(EMITTED));
-//        ret.set_transferred(valueStat(TRANSFERRED));
-//        ret.set_rate(this.rate);
-//
-//        SpoutStats spoutStats = new SpoutStats(
-//                valueStat(ACKED), valueStat(FAILED), 
valueStat(COMPLETE_LATENCIES));
-//        ret.set_specific(ExecutorSpecificStats.spout(spoutStats));
-//
-//        return ret;
-//    }
 }

Reply via email to