http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricsReporter.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricsReporter.java
index 489bec8..5039c87 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricsReporter.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricsReporter.java
@@ -17,390 +17,300 @@
  */
 package com.alibaba.jstorm.metric;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
-import backtype.storm.LocalCluster;
 import backtype.storm.generated.MetricInfo;
-import backtype.storm.generated.MetricWindow;
-import backtype.storm.generated.NettyMetric;
+import backtype.storm.generated.TopologyMetric;
 import backtype.storm.generated.WorkerUploadMetrics;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Values;
 import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
+import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.callback.RunnableCallback;
 import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.cluster.Common;
 import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.common.metric.Gauge;
-import com.alibaba.jstorm.common.metric.MetricFilter;
-import com.alibaba.jstorm.common.metric.MetricRegistry;
-import com.alibaba.jstorm.common.metric.window.Metric;
+import com.alibaba.jstorm.common.metric.AsmMetric;
+import com.alibaba.jstorm.daemon.nimbus.NimbusData;
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.Update;
 import com.alibaba.jstorm.daemon.worker.WorkerData;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.codahale.metrics.health.HealthCheck;
-import com.codahale.metrics.health.HealthCheckRegistry;
-
-public class JStormMetricsReporter extends RunnableCallback {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(JStormMetricsReporter.class);
-
-    private MetricRegistry workerMetrics = JStormMetrics.workerMetrics;
-    private Map<Integer, MetricRegistry> taskMetrics =
-            JStormMetrics.taskMetrics;
-    private MetricRegistry skipMetrics = JStormMetrics.skipMetrics;
+import com.alibaba.jstorm.utils.JStormServerUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-    private JStormMetricFilter inputFilter;
+import java.util.*;
 
-    private JStormMetricFilter outputFilter;
+/**
+ * report metrics from worker to nimbus server. this class serves as an object 
in Worker/Nimbus/Supervisor.
+ * when in Worker, it reports data via netty transport; otherwise reports via 
thrift.
+ * <p/>
+ * there are 2 threads:
+ * 1.flush thread: check every 1 sec, when current time is aligned to 1 min, 
flush all metrics to snapshots
+ * 2.check meta thread: use thrift to get metric id from nimbus server.
+ *
+ * @author Cody ([email protected])
+ * @since 2.0.5
+ */
+public class JStormMetricsReporter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JStormMetricsReporter.class);
 
     private Map conf;
-    private String topologyId;
-    private String supervisorId;
-    private int port;
-    private int frequence;
-
-    private StormClusterState clusterState;
-    private boolean localMode = false;
-    private NimbusClient client;
-
-    public JStormMetricsReporter(WorkerData workerData) {
-        this.conf = workerData.getStormConf();
-        this.topologyId = (String) conf.get(Config.TOPOLOGY_ID);
-        this.supervisorId = workerData.getSupervisorId();
-        this.port = workerData.getPort();
-        this.frequence = ConfigExtension.getWorkerMetricReportFrequency(conf);
-        this.clusterState = workerData.getZkCluster();
-
-        outputFilter = new JStormMetricFilter(MetricDef.OUTPUT_TAG);
-        inputFilter = new JStormMetricFilter(MetricDef.INPUT_TAG);
-        localMode = StormConfig.local_mode(conf);
-        LOG.info("Successfully start ");
-    }
+    protected String clusterName;
+    protected String topologyId;
+    protected String host;
+    protected int port;
 
-    protected boolean getMoreMetric(
-            Map<String, Map<String, MetricWindow>> extraMap,
-            JStormMetricFilter metricFilter, String metricFullName,
-            Map<Integer, Double> metricWindow) {
-        if (metricFilter.matches(metricFullName, null) == false) {
-            return false;
-        }
+    protected boolean localMode = false;
 
-        int pos = metricFullName.indexOf(MetricRegistry.NAME_SEPERATOR);
-        if (pos <= 0 || pos >= metricFullName.length() - 1) {
-            return false;
-        }
+    private AsyncLoopThread checkMetricMetaThread;
+    protected final int checkMetaThreadCycle;
 
-        String metricName = metricFullName.substring(0, pos);
-        String extraName = metricFullName.substring(pos + 1);
+    private AsyncLoopThread flushMetricThread;
+    protected final int flushMetricThreadCycle;
 
-        Map<String, MetricWindow> item = extraMap.get(metricName);
-        if (item == null) {
-            item = new HashMap<String, MetricWindow>();
-            extraMap.put(metricName, item);
-        }
+    private boolean test = false;
 
-        MetricWindow metricWindowThrift = new MetricWindow();
-        metricWindowThrift.set_metricWindow(metricWindow);
+    private boolean isInWorker = false;
 
-        item.put(extraName, metricWindowThrift);
+    private SpoutOutputCollector spoutOutput;
+    private OutputCollector boltOutput;
 
-        return true;
-    }
-    
-    protected void insertNettyMetrics(Map<String, MetricInfo> nettyMetricInfo, 
-                    Map<Integer, Double> snapshot,
-                    String metricFullName) {
-        int pos = metricFullName.indexOf(MetricRegistry.NAME_SEPERATOR);
-        if (pos < 0 || pos >= metricFullName.length() - 1) {
-            return ;
-        }
-        
-        String realHeader = metricFullName.substring(0, pos);
-        String nettyConnection = metricFullName.substring(pos + 1);
-        
-        MetricInfo metricInfo = nettyMetricInfo.get(nettyConnection);
-        if (metricInfo == null) {
-            metricInfo = MetricThrift.mkMetricInfo();
-            
-            nettyMetricInfo.put(nettyConnection, metricInfo);
-        }
-        
-        MetricThrift.insert(metricInfo, realHeader, snapshot);
-    }
-    
-    protected void insertMergeList(Map<String, List<Map<Integer, Double> > > 
mergeMap, 
-                    List<String> mergeList, 
-                    Map<Integer, Double> snapshot,
-                    String name) {
-        for (String tag : mergeList) {
-            if (name.startsWith(tag) == false) {
-                continue;
-            }
-            List<Map<Integer, Double> > list = mergeMap.get(tag);
-            if (list == null) {
-                list = new ArrayList<Map<Integer,Double>>();
-                mergeMap.put(tag, list);
-            }
-            
-            list.add(snapshot);
-            
-        }
-    }
-    
-    protected void doMergeList(MetricInfo workerMetricInfo, 
-                    Map<String, List<Map<Integer, Double> > > mergeMap) {
-        for (Entry<String, List<Map<Integer, Double> > > entry : 
mergeMap.entrySet()) {
-            String name = entry.getKey();
-            List<Map<Integer, Double>> list = entry.getValue();
-            
-            Map<Integer, Double> merged = JStormUtils.mergeMapList(list);
-            
-            MetricThrift.insert(workerMetricInfo, name, merged);
-        }
-    }
+    private boolean enableMetrics;
+    private NimbusClient client = null;
 
-    public MetricInfo computWorkerMetrics() {
-        MetricInfo workerMetricInfo = MetricThrift.mkMetricInfo();
-        Map<String, MetricInfo> nettyMetricInfo = new HashMap<String, 
MetricInfo>();
-
-        Map<String, List<Map<Integer, Double> > > mergeMap = 
-                new HashMap<String, List<Map<Integer,Double> > >();
-        List<String> mergeList = new ArrayList<String>();
-        mergeList.add(MetricDef.NETTY_CLI_SEND_SPEED);
-        
-        Map<String, Metric> workerMetricMap = workerMetrics.getMetrics();
-        for (Entry<String, Metric> entry : workerMetricMap.entrySet()) {
-            String name = entry.getKey();
-            Map<Integer, Double> snapshot = entry.getValue().getSnapshot();
-
-            if (MetricDef.isNettyDetails(name) == false) {
-                MetricThrift.insert(workerMetricInfo, name, snapshot);
-                continue;
-            }
-            
-            insertNettyMetrics(nettyMetricInfo, snapshot, name);
-            
-            insertMergeList(mergeMap, mergeList, snapshot, name);
-            
+    public JStormMetricsReporter(Object role) {
+        LOG.info("starting jstorm metrics reporter");
+        if (role instanceof WorkerData) {
+            WorkerData workerData = (WorkerData) role;
+            this.conf = workerData.getStormConf();
+            this.topologyId = (String) conf.get(Config.TOPOLOGY_ID);
+            this.port = workerData.getPort();
+            this.isInWorker = true;
+        } else if (role instanceof NimbusData) {
+            NimbusData nimbusData = (NimbusData) role;
+            this.conf = nimbusData.getConf();
+            this.topologyId = JStormMetrics.NIMBUS_METRIC_KEY;
         }
-        
-        doMergeList(workerMetricInfo, mergeMap);
-        
-        JStormMetrics.setExposeWorkerMetrics(workerMetricInfo);
-        JStormMetrics.setExposeNettyMetrics(nettyMetricInfo);
-        return workerMetricInfo;
-    }
-
-    public boolean isTaskQueueFull(Metric metric,
-            Map<Integer, Double> snapshot, String name) {
-        if (metric instanceof Gauge) {
-            if (MetricDef.TASK_QUEUE_SET.contains(name)) {
-                for (Entry<Integer, Double> entry : snapshot.entrySet()) {
-                    if (entry.getValue() == MetricDef.FULL_RATIO) {
-                        return true;
-                    }
-                }
-            }
+        this.host = JStormMetrics.getHost();
+        this.enableMetrics = JStormMetrics.isEnabled();
+        if (!enableMetrics) {
+            LOG.warn("***** topology metrics is disabled! *****");
+        } else {
+            LOG.info("topology metrics is enabled.");
         }
 
-        return false;
-    }
-
-    public Map<Integer, MetricInfo> computeTaskMetrics() {
-        Map<Integer, MetricInfo> ret = new HashMap<Integer, MetricInfo>();
+        this.checkMetaThreadCycle = 30;
+        // flush metric snapshots when time is aligned, check every sec.
+        this.flushMetricThreadCycle = 1;
 
-        for (Entry<Integer, MetricRegistry> entry : taskMetrics.entrySet()) {
-            Integer taskId = entry.getKey();
-            MetricRegistry taskMetrics = entry.getValue();
+        LOG.info("check meta thread freq:{}, flush metrics thread freq:{}", 
checkMetaThreadCycle, flushMetricThreadCycle);
 
-            Map<String, Map<String, MetricWindow>> inputMap =
-                    new HashMap<String, Map<String, MetricWindow>>();
-            Map<String, Map<String, MetricWindow>> outputMap =
-                    new HashMap<String, Map<String, MetricWindow>>();
-
-            MetricInfo taskMetricInfo = MetricThrift.mkMetricInfo();
-            taskMetricInfo.set_inputMetric(inputMap);
-            taskMetricInfo.set_outputMetric(outputMap);
-            ret.put(taskId, taskMetricInfo);
-
-            for (Entry<String, Metric> metricEntry : taskMetrics.getMetrics()
-                    .entrySet()) {
-                String name = metricEntry.getKey();
-                Metric metric = metricEntry.getValue();
-                Map<Integer, Double> snapshot = metric.getSnapshot();
-
-                boolean isInput =
-                        getMoreMetric(inputMap, inputFilter, name, snapshot);
-                boolean isOutput =
-                        getMoreMetric(outputMap, outputFilter, name, snapshot);
-
-                if (isInput == false && isOutput == false) {
-                    MetricThrift.insert(taskMetricInfo, name, snapshot);
-                }
-            }
-
-            MetricThrift.merge(taskMetricInfo, inputMap);
-            MetricThrift.merge(taskMetricInfo, outputMap);
-
-        }
+        this.localMode = StormConfig.local_mode(conf);
+        this.clusterName = ConfigExtension.getClusterName(conf);
+        LOG.info("done.");
+    }
 
-        JStormMetrics.setExposeTaskMetrics(ret);
-        return ret;
+    @VisibleForTesting
+    JStormMetricsReporter() {
+        LOG.info("Successfully started jstorm metrics reporter for test.");
+        this.test = true;
+        this.flushMetricThreadCycle = 1;
+        this.checkMetaThreadCycle = 30;
     }
-    
-    public void healthCheck(Integer taskId, HealthCheckRegistry healthCheck) {
-       if (taskId == null) {
-               return ;
-       }
-       
-       final Map<String, HealthCheck.Result> results =
-                       healthCheck.runHealthChecks();
-        for (Entry<String, HealthCheck.Result> resultEntry : results
-                .entrySet()) {
-            HealthCheck.Result result = resultEntry.getValue();
-            if (result.isHealthy() == false) {
-                LOG.warn("{}:{}", taskId, result.getMessage());
-                try {
-                    clusterState.report_task_error(topologyId, taskId,
-                            result.getMessage());
-                } catch (Exception e) {
-                    // TODO Auto-generated catch block
-                    LOG.error(e.getMessage(), e);
-                }
 
-            }
+    public void init() {
+        if (!localMode && enableMetrics) {
+            this.checkMetricMetaThread = new AsyncLoopThread(new 
CheckMetricMetaThread());
+            this.flushMetricThread = new AsyncLoopThread(new 
FlushMetricThread());
         }
     }
 
-    public void healthCheck() {
-        Integer firstTask = null;
-
-        Map<Integer, HealthCheckRegistry> taskHealthCheckMap =
-                JStormHealthCheck.getTaskhealthcheckmap();
-
-        for (Entry<Integer, HealthCheckRegistry> entry : taskHealthCheckMap
-                .entrySet()) {
-            Integer taskId = entry.getKey();
-            HealthCheckRegistry taskHealthCheck = entry.getValue();
-
-            healthCheck(taskId, taskHealthCheck);
+    private Map<String, Long> registerMetrics(Set<String> names) {
+        if (test || !enableMetrics) {
+            return new HashMap<>();
+        }
+        try {
+            if (client == null) {
+                client = NimbusClient.getConfiguredClient(conf);
+            }
 
-            if (firstTask != null) {
-                firstTask = taskId;
+            return client.getClient().registerMetrics(topologyId, names);
+        } catch (Exception e) {
+            LOG.error("Failed to gen metric ids", e);
+            if (client != null) {
+                client.close();
+                client = NimbusClient.getConfiguredClient(conf);
             }
         }
 
-        HealthCheckRegistry workerHealthCheck =
-                JStormHealthCheck.getWorkerhealthcheck();
-        healthCheck(firstTask, workerHealthCheck);
-        
-
+        return null;
     }
 
-    @Override
-    public void run() {
+    public void shutdown() {
+        if (!localMode && enableMetrics) {
+            this.checkMetricMetaThread.cleanup();
+            this.flushMetricThread.cleanup();
+        }
+    }
 
+    public void doUpload() {
+        if (test) {
+            return;
+        }
         try {
-            // TODO Auto-generated method stub
-            MetricInfo workerMetricInfo = computWorkerMetrics();
-            
-            Map<Integer, MetricInfo> taskMetricMap = computeTaskMetrics();
+            long start = System.currentTimeMillis();
+            MetricInfo workerMetricInfo = JStormMetrics.computeAllMetrics();
 
             WorkerUploadMetrics upload = new WorkerUploadMetrics();
-            upload.set_topology_id(topologyId);
-            upload.set_supervisor_id(supervisorId);
+            upload.set_topologyId(topologyId);
+            upload.set_supervisorId(host);
             upload.set_port(port);
-            upload.set_workerMetric(workerMetricInfo);
-            upload.set_nettyMetric(
-                            new NettyMetric(
-                                   JStormMetrics.getExposeNettyMetrics(), 
-                                   
JStormMetrics.getExposeNettyMetrics().size()));
-            upload.set_taskMetric(taskMetricMap);
-            
-            uploadMetric(upload);
-            
-            healthCheck();
-
-            LOG.info("Successfully upload worker's metrics");
-            LOG.info(Utils.toPrettyJsonString(workerMetricInfo));
-            
LOG.info(Utils.toPrettyJsonString(JStormMetrics.getExposeNettyMetrics()));
-            LOG.info(Utils.toPrettyJsonString(taskMetricMap));
+            upload.set_allMetrics(workerMetricInfo);
+
+            if (workerMetricInfo.get_metrics_size() > 0) {
+                uploadMetric(upload);
+                LOG.info("Successfully upload worker metrics, size:{}, 
cost:{}",
+                        workerMetricInfo.get_metrics_size(), 
System.currentTimeMillis() - start);
+            } else {
+                LOG.info("No metrics to upload.");
+            }
         } catch (Exception e) {
             LOG.error("Failed to upload worker metrics", e);
         }
-
     }
 
-    public void uploadMetric(WorkerUploadMetrics upload) {
-        if (StormConfig.local_mode(conf)) {
-            try {
-                byte[] temp = Utils.serialize(upload);
 
-                LocalCluster.getInstance().getLocalClusterMap().getNimbus()
-                        .workerUploadMetric(upload);
-            } catch (TException e) {
-                // TODO Auto-generated catch block
-                LOG.error("Failed to upload worker metrics", e);
+    public void uploadMetric(WorkerUploadMetrics metrics) {
+        if (isInWorker) {
+        //in Worker, we upload data via netty transport
+            if (boltOutput != null) {
+                LOG.info("emit metrics through bolt collector.");
+                boltOutput.emit(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID,
+                        new Values(JStormServerUtils.getName(host, port), 
metrics));
+            } else if (spoutOutput != null) {
+                LOG.info("emit metrics through spout collector.");
+                spoutOutput.emit(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID,
+                        new Values(JStormServerUtils.getName(host, port), 
metrics));
             }
-        } else {
+        }else {
+        // in supervisor or nimbus, we upload metric data via thrift
+            LOG.info("emit metrics through nimbus client.");
+            Update event = new Update();
+            TopologyMetric tpMetric = MetricUtils.mkTopologyMetric();
+            tpMetric.set_workerMetric(metrics.get_allMetrics());
+
+            event.topologyMetrics = tpMetric;
+            event.topologyId = topologyId;
+
             try {
-               if (client == null) {
-                       client = NimbusClient.getConfiguredClient(conf);
-               }
-                client.getClient().workerUploadMetric(upload);
-            } catch (Exception e) {
-                LOG.error("Failed to upload worker metrics", e);
+                if (client == null) {
+                    client = NimbusClient.getConfiguredClient(conf);
+                }
+                client.getClient().uploadTopologyMetrics(topologyId, tpMetric);
+            } catch (Exception ex) {
+                LOG.error("upload metric error:", ex);
                 if (client != null) {
                     client.close();
-                    client = null;
+                    client = NimbusClient.getConfiguredClient(conf);
                 }
-            } finally {
-                
             }
         }
+        //MetricUtils.logMetrics(metrics.get_allMetrics());
     }
 
-    @Override
-    public Object getResult() {
-        return frequence;
-    }
-    
-    @Override
-    public void shutdown() {
-       if (client != null) {
-            client.close();
-            client = null;
+
+    public void setOutputCollector(Object outputCollector) {
+        if (outputCollector instanceof OutputCollector) {
+            this.boltOutput = (OutputCollector) outputCollector;
+        } else if (outputCollector instanceof SpoutOutputCollector) {
+            this.spoutOutput = (SpoutOutputCollector) outputCollector;
         }
+
     }
 
-    public static class JStormMetricFilter implements MetricFilter {
-        private static final long serialVersionUID = -8886536175626248855L;
-        private String[] tags;
 
-        public JStormMetricFilter(String[] tags) {
-            this.tags = tags;
+    class FlushMetricThread extends RunnableCallback {
+        @Override
+        public void run() {
+            if (TimeUtils.isTimeAligned()) {
+                int cnt = 0;
+                try {
+                    for (AsmMetricRegistry registry : 
JStormMetrics.allRegistries) {
+                        for (Map.Entry<String, AsmMetric> entry : 
registry.getMetrics().entrySet()) {
+                            entry.getValue().flush();
+                            cnt++;
+                        }
+                    }
+                    LOG.info("flush metrics, total:{}.", cnt);
+
+                    doUpload();
+                } catch (Exception ex) {
+                    LOG.error("Error", ex);
+                }
+            }
+        }
+
+        @Override
+        public Object getResult() {
+            return flushMetricThreadCycle;
         }
+    }
+
+    class CheckMetricMetaThread extends RunnableCallback {
+        private volatile boolean processing = false;
+        private final long start = TimeUtils.current_time_secs();
+        private final long initialDelay = 30 + new Random().nextInt(15);
 
         @Override
-        public boolean matches(String name, Metric metric) {
-            // TODO Auto-generated method stub
-            for (String tag : tags) {
-                if (name.startsWith(tag)) {
-                    return true;
+        public void run() {
+            if (TimeUtils.current_time_secs() - start < initialDelay) {
+                return;
+            }
+
+            if (processing) {
+                LOG.info("still processing, skip...");
+            } else {
+                processing = true;
+                long start = System.currentTimeMillis();
+                try {
+                    Set<String> names = new HashSet<>();
+                    for (AsmMetricRegistry registry : 
JStormMetrics.allRegistries) {
+                        Map<String, AsmMetric> metricMap = 
registry.getMetrics();
+                        for (Map.Entry<String, AsmMetric> metricEntry : 
metricMap.entrySet()) {
+                            AsmMetric metric = metricEntry.getValue();
+                            if (((metric.getOp() & AsmMetric.MetricOp.REPORT) 
== AsmMetric.MetricOp.REPORT) &&
+                                    metric.getMetricId() == 0L) {
+                                names.add(metricEntry.getKey());
+                            }
+                        }
+                    }
+
+                    if (names.size() > 0) {
+                        Map<String, Long> nameIdMap = registerMetrics(names);
+                        if (nameIdMap != null) {
+                            for (String name : nameIdMap.keySet()) {
+                                AsmMetric metric = JStormMetrics.find(name);
+                                if (metric != null) {
+                                    long id = nameIdMap.get(name);
+                                    metric.setMetricId(id);
+                                    LOG.info("set metric id, {}:{}", name, id);
+                                }
+                            }
+                        }
+                        LOG.info("register metrics, size:{}, cost:{}", 
names.size(), System.currentTimeMillis() - start);
+                    }
+                } catch (Exception ex) {
+                    LOG.error("Error", ex);
                 }
+                processing = false;
             }
-            return false;
         }
 
+        @Override
+        public Object getResult() {
+            return checkMetaThreadCycle;
+        }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/KVSerializable.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/KVSerializable.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/KVSerializable.java
new file mode 100644
index 0000000..7adaf46
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/KVSerializable.java
@@ -0,0 +1,17 @@
+package com.alibaba.jstorm.metric;
+
+/**
+ * @author Cody ([email protected])
+ * @since 2.0.5
+ */
+public interface KVSerializable {
+     String START = "S", END = "E";
+    int LONG_SIZE = 8;
+    int INT_SIZE = 4;
+
+    public byte[] getKey();
+
+    public byte[] getValue();
+
+    public Object fromKV(byte[] key, byte[] value);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaFilter.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaFilter.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaFilter.java
new file mode 100644
index 0000000..f39f51c
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaFilter.java
@@ -0,0 +1,11 @@
+package com.alibaba.jstorm.metric;
+
+import com.alibaba.jstorm.common.metric.MetricMeta;
+
+/**
+ * @author Cody ([email protected])
+ * @since 2.0.5
+ */
+public interface MetaFilter {
+    boolean matches(MetricMeta meta, Object arg);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaType.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaType.java
new file mode 100644
index 0000000..3dfe665
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaType.java
@@ -0,0 +1,50 @@
+package com.alibaba.jstorm.metric;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Cody ([email protected])
+ * @since 2.0.5
+ */
+public enum MetaType {
+    TASK(1, "T"), COMPONENT(2, "C"), STREAM(3, "S"), WORKER(4, "W"), 
TOPOLOGY(5, "P"), NETTY(6, "N"), NIMBUS(7, "M");
+
+    private int t;
+    private String v;
+
+    MetaType(int t, String v) {
+        this.t = t;
+        this.v = v;
+    }
+
+    private static final Map<String, MetaType> valueMap = new HashMap<String, 
MetaType>();
+    private static final Map<Integer, MetaType> typeMap = new HashMap<Integer, 
MetaType>();
+
+    static {
+        for (MetaType type : MetaType.values()) {
+            typeMap.put(type.getT(), type);
+            valueMap.put(type.getV(), type);
+        }
+    }
+
+    public String getV() {
+        return this.v;
+    }
+
+    public int getT() {
+        return t;
+    }
+
+    public static MetaType parse(char ch) {
+        return parse(ch + "");
+    }
+
+    public static MetaType parse(String v) {
+        return valueMap.get(v);
+    }
+
+    public static MetaType parse(int t) {
+        return typeMap.get(t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricClient.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricClient.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricClient.java
new file mode 100644
index 0000000..1d63f46
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricClient.java
@@ -0,0 +1,92 @@
+package com.alibaba.jstorm.metric;
+
+import backtype.storm.task.TopologyContext;
+import com.alibaba.jstorm.common.metric.*;
+import com.codahale.metrics.Gauge;
+
+/**
+ * metric client for end users to add custom metrics.
+ *
+ * @author Cody ([email protected])
+ * @since 2.0.5
+ */
+@SuppressWarnings("unused")
+public class MetricClient {
+    private static final String GROUP_UDF = "udf";
+
+    private final String topologyId;
+    private final String componentId;
+    private final int taskId;
+
+    public MetricClient(TopologyContext context) {
+        taskId = context.getThisTaskId();
+        this.topologyId = context.getTopologyId();
+        this.componentId = context.getThisComponentId();
+    }
+
+    public AsmGauge registerGauge(String name, Gauge<Double> gauge) {
+        return registerGauge(name, GROUP_UDF, gauge);
+    }
+
+    public AsmGauge registerGauge(String name, String group, Gauge<Double> 
gauge) {
+        String userMetricName = getMetricName(name, group, MetricType.GAUGE);
+        AsmGauge asmGauge = new AsmGauge(gauge);
+        JStormMetrics.registerTaskMetric(userMetricName, asmGauge);
+        return asmGauge;
+    }
+
+    public AsmCounter registerCounter(String name) {
+        return registerCounter(name, GROUP_UDF);
+    }
+
+    public AsmCounter registerCounter(String name, String group) {
+        String userMetricName = getMetricName(name, group, MetricType.COUNTER);
+        AsmCounter counter = new AsmCounter();
+        JStormMetrics.registerTaskMetric(userMetricName, counter);
+        return counter;
+    }
+
+    public AsmMeter registerMeter(String name) {
+        return registerMeter(name, GROUP_UDF);
+    }
+
+    public AsmMeter registerMeter(String name, String group) {
+        String userMetricName = getMetricName(name, group, MetricType.METER);
+        return (AsmMeter) JStormMetrics.registerTaskMetric(userMetricName, new 
AsmMeter());
+    }
+
+    public AsmTimer registerTimer(String name) {
+        return registerTimer(name, GROUP_UDF);
+    }
+
+    public AsmTimer registerTimer(String name, String group) {
+        String userMetricName = getMetricName(name, group, MetricType.TIMER);
+        return (AsmTimer) JStormMetrics.registerTaskMetric(userMetricName, new 
AsmTimer());
+    }
+
+    public AsmHistogram registerHistogram(String name) {
+        return registerHistogram(name, GROUP_UDF);
+    }
+
+    public AsmHistogram registerHistogram(String name, String group) {
+        String userMetricName = getMetricName(name, group, 
MetricType.HISTOGRAM);
+        return (AsmHistogram) JStormMetrics.registerTaskMetric(userMetricName, 
new AsmHistogram());
+    }
+
+    public void unregister(String name, MetricType type) {
+        unregister(name, GROUP_UDF, type);
+    }
+
+    public void unregister(String name, String group, MetricType type) {
+        String userMetricName = getMetricName(name, group, type);
+        JStormMetrics.unregisterTaskMetric(userMetricName);
+    }
+
+    private String getMetricName(String name, MetricType type) {
+        return getMetricName(name, GROUP_UDF, type);
+    }
+
+    private String getMetricName(String name, String group, MetricType type) {
+        return MetricUtils.taskMetricName(topologyId, componentId, taskId, 
group, name, type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDataConverter.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDataConverter.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDataConverter.java
new file mode 100644
index 0000000..cdb47f3
--- /dev/null
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDataConverter.java
@@ -0,0 +1,87 @@
+package com.alibaba.jstorm.metric;
+
+import backtype.storm.generated.MetricSnapshot;
+import com.alibaba.jstorm.common.metric.*;
+import com.alibaba.jstorm.utils.TimeUtils;
+
+import java.util.Date;
+
+/**
+ * @author Cody ([email protected])
+ * @since 2.0.5
+ */
+public class MetricDataConverter {
+
+    public static CounterData toCounterData(MetricSnapshot snapshot, int win) {
+        CounterData data = new CounterData();
+        convertBase(snapshot, data, win);
+
+        data.setV(snapshot.get_longValue());
+        return data;
+    }
+
+    public static GaugeData toGaugeData(MetricSnapshot snapshot, int win) {
+        GaugeData data = new GaugeData();
+        convertBase(snapshot, data, win);
+
+        data.setV(snapshot.get_doubleValue());
+        return data;
+    }
+
+    public static MeterData toMeterData(MetricSnapshot snapshot, int win) {
+        MeterData data = new MeterData();
+        convertBase(snapshot, data, win);
+
+        data.setM1(snapshot.get_m1());
+        data.setM5(snapshot.get_m5());
+        data.setM15(snapshot.get_m15());
+        data.setMean(snapshot.get_mean());
+
+        return data;
+    }
+
+    public static HistogramData toHistogramData(MetricSnapshot snapshot, int 
win) {
+        HistogramData data = new HistogramData();
+        convertBase(snapshot, data, win);
+
+        data.setMin(snapshot.get_min());
+        data.setMax(snapshot.get_max());
+        data.setP50(snapshot.get_p50());
+        data.setP75(snapshot.get_p75());
+        data.setP95(snapshot.get_p95());
+        data.setP98(snapshot.get_p98());
+        data.setP99(snapshot.get_p99());
+        data.setP999(snapshot.get_p999());
+        data.setMean(snapshot.get_mean());
+
+        return data;
+    }
+
+    public static TimerData toTimerData(MetricSnapshot snapshot, int win) {
+        TimerData data = new TimerData();
+        convertBase(snapshot, data, win);
+
+        data.setMin(snapshot.get_min());
+        data.setMax(snapshot.get_max());
+        data.setP50(snapshot.get_p50());
+        data.setP75(snapshot.get_p75());
+        data.setP95(snapshot.get_p95());
+        data.setP98(snapshot.get_p98());
+        data.setP99(snapshot.get_p99());
+        data.setP999(snapshot.get_p999());
+        data.setMean(snapshot.get_mean());
+        data.setM1(snapshot.get_m1());
+        data.setM5(snapshot.get_m5());
+        data.setM15(snapshot.get_m15());
+
+        return data;
+    }
+
+    private static void convertBase(MetricSnapshot snapshot, MetricBaseData 
data, int win) {
+        long newTs = TimeUtils.alignTimeToWin(snapshot.get_ts(), win);
+        data.setWin(win);
+        data.setMetricId(snapshot.get_metricId());
+        data.setTs(new Date(newTs));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDef.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDef.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDef.java
index 58413bb..c060e51 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDef.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDef.java
@@ -17,9 +17,7 @@
  */
 package com.alibaba.jstorm.metric;
 
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 public class MetricDef {
@@ -28,10 +26,8 @@ public class MetricDef {
     public static final String TIME_TYPE = "Time";
 
     public static final String DESERIALIZE_THREAD = "Deserialize";
-    public static final String DESERIALIZE_QUEUE = DESERIALIZE_THREAD
-            + QUEUE_TYPE;
-    public static final String DESERIALIZE_TIME = DESERIALIZE_THREAD
-            + TIME_TYPE;
+    public static final String DESERIALIZE_QUEUE = DESERIALIZE_THREAD + 
QUEUE_TYPE;
+    public static final String DESERIALIZE_TIME = DESERIALIZE_THREAD + 
TIME_TYPE;
 
     public static final String SERIALIZE_THREAD = "Serialize";
     public static final String SERIALIZE_QUEUE = SERIALIZE_THREAD + QUEUE_TYPE;
@@ -45,54 +41,46 @@ public class MetricDef {
     public static final String EMPTY_CPU_RATIO = "EmptyCpuRatio";
     public static final String PENDING_MAP = "PendingNum";
     public static final String COLLECTOR_EMIT_TIME = "EmitTime";
-
-   
+    public static final String TUPLE_LIEF_CYCLE = "TupleLifeCycle";
 
     public static final String DISPATCH_THREAD = "VirtualPortDispatch";
     public static final String DISPATCH_QUEUE = DISPATCH_THREAD + QUEUE_TYPE;
     public static final String DISPATCH_TIME = DISPATCH_THREAD + TIME_TYPE;
 
     public static final String BATCH_DRAINER_THREAD = "BatchDrainer";
-    public static final String BATCH_DRAINER_QUEUE = BATCH_DRAINER_THREAD
-            + QUEUE_TYPE;
-    public static final String BATCH_DRAINER_TIME = BATCH_DRAINER_THREAD
-            + TIME_TYPE;
+    public static final String BATCH_DRAINER_QUEUE = BATCH_DRAINER_THREAD + 
QUEUE_TYPE;
+    public static final String BATCH_DRAINER_TIME = BATCH_DRAINER_THREAD + 
TIME_TYPE;
 
     public static final String DRAINER_THREAD = "Drainer";
     public static final String DRAINER_QUEUE = DRAINER_THREAD + QUEUE_TYPE;
     public static final String DRAINER_TIME = DRAINER_THREAD + TIME_TYPE;
 
-    
     public static final String NETWORK_MSG_DECODE_TIME = 
"NetworkMsgDecodeTime";
-    
+
     // all tag start with "Netty" will be specially display in Web UI
     public static final String NETTY = "Netty";
     public static final String NETTY_CLI = NETTY + "Client";
     public static final String NETTY_SRV = NETTY + "Server";
     public static final String NETTY_CLI_SEND_SPEED = NETTY_CLI + "SendSpeed";
     public static final String NETTY_SRV_RECV_SPEED = NETTY_SRV + "RecvSpeed";
-    
+
     public static final String NETTY_CLI_SEND_TIME = NETTY_CLI + "SendTime";
-    public static final String NETTY_CLI_BATCH_SIZE =
-            NETTY_CLI + "SendBatchSize";
-    public static final String NETTY_CLI_SEND_PENDING =
-            NETTY_CLI + "SendPendings";
-    public static final String NETTY_CLI_SYNC_BATCH_QUEUE =
-            NETTY_CLI + "SyncBatchQueue";
-    public static final String NETTY_CLI_SYNC_DISR_QUEUE =
-            NETTY_CLI + "SyncDisrQueue";
+    public static final String NETTY_CLI_BATCH_SIZE = NETTY_CLI + 
"SendBatchSize";
+    public static final String NETTY_CLI_SEND_PENDING = NETTY_CLI + 
"SendPendings";
+    public static final String NETTY_CLI_SYNC_BATCH_QUEUE = NETTY_CLI + 
"SyncBatchQueue";
+    public static final String NETTY_CLI_SYNC_DISR_QUEUE = NETTY_CLI + 
"SyncDisrQueue";
     public static final String NETTY_CLI_CACHE_SIZE = NETTY_CLI + "CacheSize";
     public static final String NETTY_CLI_CONNECTION = NETTY_CLI + 
"ConnectionCheck";
-    
+
     // metric name for worker
     public static final String NETTY_SRV_MSG_TRANS_TIME = NETTY_SRV + 
"TransmitTime";
-    
 
     public static final String ZMQ_SEND_TIME = "ZMQSendTime";
     public static final String ZMQ_SEND_MSG_SIZE = "ZMQSendMSGSize";
 
     public static final String CPU_USED_RATIO = "CpuUsedRatio";
     public static final String MEMORY_USED = "MemoryUsed";
+    public static final String DISK_USAGE = "DiskUsage";
 
     public static final String REMOTE_CLI_ADDR = "RemoteClientAddress";
     public static final String REMOTE_SERV_ADDR = "RemoteServerAddress";
@@ -105,10 +93,10 @@ public class MetricDef {
     public static final String PROCESS_LATENCY = "ProcessLatency";
 
     public static final String[] OUTPUT_TAG = { EMMITTED_NUM, SEND_TPS };
-    public static final String[] INPUT_TAG = { RECV_TPS, ACKED_NUM, FAILED_NUM,
-            PROCESS_LATENCY };
+    public static final String[] INPUT_TAG = { RECV_TPS, ACKED_NUM, 
FAILED_NUM, PROCESS_LATENCY };
 
     public static final Set<String> MERGE_SUM_TAG = new HashSet<String>();
+
     static {
         MERGE_SUM_TAG.add(MetricDef.EMMITTED_NUM);
         MERGE_SUM_TAG.add(MetricDef.SEND_TPS);
@@ -119,6 +107,7 @@ public class MetricDef {
     }
 
     public static final Set<String> MERGE_AVG_TAG = new HashSet<String>();
+
     static {
         MERGE_AVG_TAG.add(PROCESS_LATENCY);
     }
@@ -128,6 +117,7 @@ public class MetricDef {
     public static final String QEUEU_IS_FULL = "queue is full";
 
     public static final Set<String> TASK_QUEUE_SET = new HashSet<String>();
+
     static {
         TASK_QUEUE_SET.add(DESERIALIZE_QUEUE);
         TASK_QUEUE_SET.add(SERIALIZE_QUEUE);
@@ -136,27 +126,28 @@ public class MetricDef {
     }
 
     public static final Set<String> WORKER_QUEUE_SET = new HashSet<String>();
+
     static {
         WORKER_QUEUE_SET.add(DISPATCH_QUEUE);
         WORKER_QUEUE_SET.add(BATCH_DRAINER_QUEUE);
         WORKER_QUEUE_SET.add(DRAINER_QUEUE);
     }
-    
-    
+
     public static final int NETTY_METRICS_PACKAGE_SIZE = 200;
+
     public static boolean isNettyDetails(String metricName) {
-        
+
         Set<String> specialNettySet = new HashSet<String>();
         specialNettySet.add(MetricDef.NETTY_CLI_SEND_SPEED);
         specialNettySet.add(MetricDef.NETTY_SRV_RECV_SPEED);
-        
+
         if (specialNettySet.contains(metricName)) {
             return false;
         }
         if (metricName.startsWith(MetricDef.NETTY)) {
             return true;
         }
-        
+
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricIDGenerator.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricIDGenerator.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricIDGenerator.java
new file mode 100644
index 0000000..3df7bab
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricIDGenerator.java
@@ -0,0 +1,9 @@
+package com.alibaba.jstorm.metric;
+
+/**
+ * @author Cody ([email protected])
+ * @since 2.0.5
+ */
+public interface MetricIDGenerator {
+    long genMetricId(String metricName);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java
index 2989196..16e6357 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java
@@ -55,8 +55,7 @@ public class MetricJstack implements Gauge<String> {
             writer.append("\n");
         }
 
-        long[] deadLockMonitorTids =
-                threadMXBean.findMonitorDeadlockedThreads();
+        long[] deadLockMonitorTids = 
threadMXBean.findMonitorDeadlockedThreads();
         if (deadLockMonitorTids != null) {
             writer.append(threadIds.length + " deadlocked monitor threads:");
             for (long tid : deadLockMonitorTids) {
@@ -66,60 +65,38 @@ public class MetricJstack implements Gauge<String> {
         }
 
         for (long tid : threadIds) {
-            ThreadInfo info =
-                    threadMXBean.getThreadInfo(tid, Integer.MAX_VALUE);
+            ThreadInfo info = threadMXBean.getThreadInfo(tid, 
Integer.MAX_VALUE);
             if (info == null) {
                 writer.append("  Inactive").append("\n");
                 continue;
             }
-            writer.append(
-                    "Thread "
-                            + getTaskName(info.getThreadId(),
-                                    info.getThreadName()) + ":").append("\n");
+            writer.append("Thread " + getTaskName(info.getThreadId(), 
info.getThreadName()) + ":").append("\n");
             Thread.State state = info.getThreadState();
             writer.append("  State: " + state).append("\n");
-            writer.append("  Blocked count: " + info.getBlockedCount()).append(
-                    "\n");
-            writer.append("  Waited count: " + info.getWaitedCount()).append(
-                    "\n");
-            writer.append(" Cpu time:")
-                    .append(threadMXBean.getThreadCpuTime(tid) / 1000000)
-                    .append("ms").append("\n");
-            writer.append(" User time:")
-                    .append(threadMXBean.getThreadUserTime(tid) / 1000000)
-                    .append("ms").append("\n");
+            writer.append("  Blocked count: " + 
info.getBlockedCount()).append("\n");
+            writer.append("  Waited count: " + 
info.getWaitedCount()).append("\n");
+            writer.append(" Cpu 
time:").append(threadMXBean.getThreadCpuTime(tid) / 
1000000).append("ms").append("\n");
+            writer.append(" User 
time:").append(threadMXBean.getThreadUserTime(tid) / 
1000000).append("ms").append("\n");
             if (contention) {
-                writer.append("  Blocked time: " + info.getBlockedTime())
-                        .append("\n");
-                writer.append("  Waited time: " + info.getWaitedTime()).append(
-                        "\n");
+                writer.append("  Blocked time: " + 
info.getBlockedTime()).append("\n");
+                writer.append("  Waited time: " + 
info.getWaitedTime()).append("\n");
             }
             if (state == Thread.State.WAITING) {
-                writer.append("  Waiting on " + info.getLockName())
-                        .append("\n");
+                writer.append("  Waiting on " + 
info.getLockName()).append("\n");
             } else if (state == Thread.State.BLOCKED) {
-                writer.append("  Blocked on " + info.getLockName())
-                        .append("\n");
-                writer.append(
-                        "  Blocked by "
-                                + getTaskName(info.getLockOwnerId(),
-                                        info.getLockOwnerName())).append("\n");
+                writer.append("  Blocked on " + 
info.getLockName()).append("\n");
+                writer.append("  Blocked by " + 
getTaskName(info.getLockOwnerId(), info.getLockOwnerName())).append("\n");
             }
 
         }
         for (long tid : threadIds) {
-            ThreadInfo info =
-                    threadMXBean.getThreadInfo(tid, Integer.MAX_VALUE);
+            ThreadInfo info = threadMXBean.getThreadInfo(tid, 
Integer.MAX_VALUE);
             if (info == null) {
                 writer.append("  Inactive").append("\n");
                 continue;
             }
 
-            writer.append(
-                    "Thread "
-                            + getTaskName(info.getThreadId(),
-                                    info.getThreadName()) + ": Stack").append(
-                    "\n");
+            writer.append("Thread " + getTaskName(info.getThreadId(), 
info.getThreadName()) + ": Stack").append("\n");
             for (StackTraceElement frame : info.getStackTrace()) {
                 writer.append("    " + frame.toString()).append("\n");
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricQueryClient.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricQueryClient.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricQueryClient.java
new file mode 100644
index 0000000..c8a8f56
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricQueryClient.java
@@ -0,0 +1,148 @@
+package com.alibaba.jstorm.metric;
+
+import com.alibaba.jstorm.common.metric.MetricMeta;
+import com.alibaba.jstorm.common.metric.TaskTrack;
+import com.alibaba.jstorm.common.metric.TopologyHistory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * metric query client for getting metric meta & data
+ *
+ * @author Cody ([email protected])
+ * @since 2.0.5
+ */
+public interface MetricQueryClient {
+
+    /**
+     * init metric query client
+     */
+    void init(Map conf);
+
+    /**
+     * get metric meta with optional meta filter
+     *
+     * @param clusterName cluster name
+     * @param topologyId  topology id
+     * @param type        meta type
+     * @param filter      meta filter, if filter matches, the corresponding 
meta will be returned.
+     * @param arg         filter argument
+     * @return meta list
+     */
+    List<MetricMeta> getMetricMeta(String clusterName, String topologyId, 
MetaType type, MetaFilter filter, Object arg);
+
+    /**
+     * get metric meta by topology id and meta type
+     *
+     * @param clusterName cluster name
+     * @param topologyId  topology id
+     * @param type        meta type
+     * @return all metric meta
+     */
+    List<MetricMeta> getMetricMeta(String clusterName, String topologyId, 
MetaType type);
+
+    /**
+     * get worker metric meta by topology id
+     *
+     * @param clusterName cluster name
+     * @param topologyId  topology id
+     * @return all worker metric meta
+     */
+    List<MetricMeta> getWorkerMeta(String clusterName, String topologyId);
+
+    /**
+     * get netty metric meta by topology id
+     *
+     * @param clusterName cluster name
+     * @param topologyId  topology id
+     * @return all netty metric meta
+     */
+    List<MetricMeta> getNettyMeta(String clusterName, String topologyId);
+
+    /**
+     * get task metric meta
+     *
+     * @param clusterName cluster name
+     * @param topologyId  topology id
+     * @param taskId      task id
+     * @return task metric meta
+     */
+    List<MetricMeta> getTaskMeta(String clusterName, String topologyId, int 
taskId);
+
+    /**
+     * get component metric meta
+     *
+     * @param clusterName cluster name
+     * @param topologyId  topology id
+     * @param componentId component id
+     * @return component metric meta
+     */
+    List<MetricMeta> getComponentMeta(String clusterName, String topologyId, 
String componentId);
+
+    /**
+     * get metric meta by id
+     *
+     * @param clusterName cluster name
+     * @param topologyId  topology id
+     * @param metaType    meta type
+     * @param metricId    metric id
+     * @return metric meta
+     */
+    MetricMeta getMetricMeta(String clusterName, String topologyId, MetaType 
metaType, long metricId);
+
+    /**
+     * get metric data
+     *
+     * @param metricId   metric id
+     * @param metricType metric type
+     * @param win        metric window
+     * @param start      start time
+     * @param end        end time
+     * @return metric data objects, depending on metric type, could be 
CounterData, GaugeData, ... etc.
+     */
+    List<Object> getMetricData(long metricId, MetricType metricType, int win, 
long start, long end);
+
+    /**
+     * get all task track by topology id
+     *
+     * @param clusterName cluster name
+     * @param topologyId  topology id
+     * @return task track
+     */
+    List<TaskTrack> getTaskTrack(String clusterName, String topologyId);
+
+    /**
+     * get task track by task id
+     *
+     * @param clusterName cluster name
+     * @param topologyId  topology id
+     * @param taskId      task id
+     * @return task track
+     */
+    List<TaskTrack> getTaskTrack(String clusterName, String topologyId, int 
taskId);
+
+    /**
+     * get topology history
+     *
+     * @param clusterName  cluster name
+     * @param topologyName topology name
+     * @param size         size
+     * @return topology history list
+     */
+    List<TopologyHistory> getTopologyHistory(String clusterName, String 
topologyName, int size);
+
+    /**
+     * delete metrics meta. note that clusterName, topologyId, metaType & id 
must be set.
+     *
+     * @param meta metric meta
+     */
+    void deleteMeta(MetricMeta meta);
+
+    /**
+     * delete metrics meta list. note that clusterName, topologyId, metaType & 
id must be set.
+     *
+     * @param metaList metric meta list
+     */
+    void deleteMeta(List<MetricMeta> metaList);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricSendClient.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricSendClient.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricSendClient.java
deleted file mode 100755
index e1313f6..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricSendClient.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-import java.util.List;
-import java.util.Map;
-
-public class MetricSendClient {
-
-    public MetricSendClient() {
-    }
-
-    public boolean send(Map<String, Object> msg) {
-        return true;
-    }
-
-    public boolean send(List<Map<String, Object>> msgList) {
-        return true;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricThrift.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricThrift.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricThrift.java
deleted file mode 100755
index 5286a1c..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricThrift.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.metric;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.generated.MetricInfo;
-import backtype.storm.generated.MetricWindow;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-public class MetricThrift {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(MetricThrift.class);
-
-    public static MetricInfo mkMetricInfo() {
-        MetricInfo metricInfo =
-                new MetricInfo(new HashMap<String, MetricWindow>());
-
-        return metricInfo;
-    }
-
-    public static void insert(MetricInfo metricInfo, String key,
-            Map<Integer, Double> windowSet) {
-        MetricWindow metricWindow = new MetricWindow();
-        metricWindow.set_metricWindow(windowSet);
-
-        metricInfo.put_to_baseMetric(key, metricWindow);
-
-    }
-
-    public static MetricWindow merge(Map<String, MetricWindow> details) {
-        Map<Integer, Double> merge = new HashMap<Integer, Double>();
-
-        for (Entry<String, MetricWindow> entry : details.entrySet()) {
-            MetricWindow metricWindow = entry.getValue();
-            Map<Integer, Double> metric = metricWindow.get_metricWindow();
-
-            for (Entry<Integer, Double> metricEntry : metric.entrySet()) {
-                Integer key = metricEntry.getKey();
-                try {
-                    Double value =
-                            ((Number) JStormUtils.add(metricEntry.getValue(),
-                                    merge.get(key))).doubleValue();
-
-                    merge.put(key, value);
-                } catch (Exception e) {
-                    LOG.error("Invalid type of " + entry.getKey() + ":" + key,
-                            e);
-                    continue;
-                }
-            }
-        }
-
-        MetricWindow ret = new MetricWindow();
-
-        ret.set_metricWindow(merge);
-        return ret;
-    }
-
-    public static void merge(MetricInfo metricInfo,
-            Map<String, Map<String, MetricWindow>> extraMap) {
-        for (Entry<String, Map<String, MetricWindow>> entry : extraMap
-                .entrySet()) {
-            String metricName = entry.getKey();
-
-            metricInfo.put_to_baseMetric(metricName, merge(entry.getValue()));
-
-        }
-    }
-
-    public static MetricWindow mergeMetricWindow(MetricWindow fromMetric,
-            MetricWindow toMetric) {
-        if (toMetric == null) {
-            toMetric = new MetricWindow(new HashMap<Integer, Double>());
-        }
-
-        if (fromMetric == null) {
-            return toMetric;
-        }
-
-        List<Map<Integer, Double>> list = new ArrayList<Map<Integer, 
Double>>();
-        list.add(fromMetric.get_metricWindow());
-        list.add(toMetric.get_metricWindow());
-        Map<Integer, Double> merged = JStormUtils.mergeMapList(list);
-
-        toMetric.set_metricWindow(merged);
-
-        return toMetric;
-    }
-    
-    public static MetricInfo mergeMetricInfo(MetricInfo from, MetricInfo to) {
-        if (to == null) {
-            to = mkMetricInfo();
-        }
-        
-        if (from == null) {
-            return to;
-        }
-        
-        to.get_baseMetric().putAll(from.get_baseMetric());
-
-        return to;
-        
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricType.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricType.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricType.java
new file mode 100644
index 0000000..0203f9e
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricType.java
@@ -0,0 +1,50 @@
+package com.alibaba.jstorm.metric;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Cody ([email protected])
+ * @since 2.0.5
+ */
+public enum MetricType {
+    COUNTER("C", 1), GAUGE("G", 2), METER("M", 3), HISTOGRAM("H", 4), 
TIMER("T", 5);
+
+    private String v;
+    private int t;
+
+    MetricType(String v, int t) {
+        this.v = v;
+        this.t = t;
+    }
+
+    public int getT() {
+        return this.t;
+    }
+
+    public String getV() {
+        return this.v;
+    }
+
+    private static final Map<String, MetricType> valueMap = new 
HashMap<String, MetricType>();
+    private static final Map<Integer, MetricType> typeMap = new 
HashMap<Integer, MetricType>();
+
+    static {
+        for (MetricType type : MetricType.values()) {
+            typeMap.put(type.getT(), type);
+            valueMap.put(type.getV(), type);
+        }
+    }
+
+    public static MetricType parse(char ch) {
+        return parse(ch + "");
+    }
+
+    public static MetricType parse(String s) {
+        return valueMap.get(s);
+    }
+
+    public static MetricType parse(int t) {
+        return typeMap.get(t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricUtils.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricUtils.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricUtils.java
new file mode 100644
index 0000000..e76effd
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricUtils.java
@@ -0,0 +1,600 @@
+package com.alibaba.jstorm.metric;
+
+import backtype.storm.generated.MetricInfo;
+import backtype.storm.generated.MetricSnapshot;
+import backtype.storm.generated.TopologyMetric;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.common.metric.AsmMetric;
+import com.alibaba.jstorm.common.metric.snapshot.*;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Cody ([email protected])
+ * @since 2.0.5
+ */
+public class MetricUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MetricUtils.class);
+
+    public static final char AT = '@';
+    public static final String DELIM = AT + "";
+    public static final String EMPTY = "";
+    public static final String DEFAULT_GROUP = "sys";
+
+    public static final int MAX_POINTS_PER_WORKER = 200;
+    public static final int NETTY_METRIC_PAGE_SIZE = 200;
+
+    public static boolean isValidId(long metricId) {
+        return metricId != 0;
+    }
+
+    public static MetricInfo mkMetricInfo() {
+        MetricInfo ret = new MetricInfo();
+        ret.set_metrics(new HashMap<String, Map<Integer, MetricSnapshot>>());
+
+        return ret;
+    }
+
+    public static TopologyMetric mkTopologyMetric() {
+        TopologyMetric emptyTopologyMetric = new TopologyMetric();
+
+        emptyTopologyMetric.set_topologyMetric(new MetricInfo());
+        emptyTopologyMetric.set_componentMetric(new MetricInfo());
+        emptyTopologyMetric.set_workerMetric(new MetricInfo());
+        emptyTopologyMetric.set_taskMetric(new MetricInfo());
+        emptyTopologyMetric.set_streamMetric(new MetricInfo());
+        emptyTopologyMetric.set_nettyMetric(new MetricInfo());
+
+        return emptyTopologyMetric;
+    }
+
+    public static boolean isEnableNettyMetrics(Map stormConf) {
+        int maxWorkerNumForNetty = 
ConfigExtension.getTopologyMaxWorkerNumForNettyMetrics(stormConf);
+        int workerNum = 
JStormUtils.parseInt(stormConf.get("topology.workers"), 1);
+        return workerNum < maxWorkerNumForNetty;
+    }
+
+    /**
+     * a metric name composites of: 
type@topologyId@componentId@taskId@streamId@group@name for non-worker metrics 
OR type@topologyId@host@port@group@name for
+     * worker metrics
+     */
+    public static String metricName(String type, String topologyId, String 
componentId, int taskId, String streamId, String group, String name) {
+        return concat(type, topologyId, componentId, taskId, streamId, group, 
name);
+    }
+
+    public static String streamMetricName(String topologyId, String 
componentId, int taskId, String streamId, String name, MetricType type) {
+        return concat(MetaType.STREAM.getV() + type.getV(), topologyId, 
componentId, taskId, streamId, DEFAULT_GROUP, name);
+    }
+
+    public static String workerMetricName(String topologyId, String host, int 
port, String name, MetricType type) {
+        return concat(MetaType.WORKER.getV() + type.getV(), topologyId, host, 
port, DEFAULT_GROUP, name);
+    }
+
+    public static String workerMetricName(String name, MetricType type) {
+        return concat(MetaType.WORKER.getV() + type.getV(), EMPTY, EMPTY, 0, 
DEFAULT_GROUP, name);
+    }
+
+    public static String nettyMetricName(String name, MetricType type) {
+        return concat(MetaType.NETTY.getV() + type.getV(), EMPTY, EMPTY, 0, 
JStormMetrics.NETTY_GROUP, name);
+    }
+
+    public static String workerMetricPrefix(String topologyId, String host, 
int port) {
+        return concat(MetaType.WORKER.getV(), topologyId, host, port);
+    }
+
+    public static String taskMetricName(String topologyId, String componentId, 
int taskId, String name, MetricType type) {
+        return concat(MetaType.TASK.getV() + type.getV(), topologyId, 
componentId, taskId, EMPTY, DEFAULT_GROUP, name);
+    }
+
+    public static String taskMetricName(String topologyId, String componentId, 
int taskId, String group, String name, MetricType type) {
+        return concat(MetaType.TASK.getV() + type.getV(), topologyId, 
componentId, taskId, EMPTY, group, name);
+    }
+
+    public static String compMetricName(String topologyId, String componentId, 
String name, MetricType type) {
+        return concat(MetaType.COMPONENT.getV() + type.getV(), topologyId, 
componentId, 0, EMPTY, DEFAULT_GROUP, name);
+    }
+
+    public static String removeDelimIfPossible(String name) {
+        if (name.contains(DELIM)) {
+            return name.replace(DELIM, EMPTY);
+        }
+        return name;
+    }
+
+    public static MetaType metaType(String name) {
+        return MetaType.parse(name.charAt(0) + EMPTY);
+    }
+
+    public static MetricType metricType(String name) {
+        return MetricType.parse(name.charAt(1) + EMPTY);
+    }
+
+    /**
+     * make streamId empty, remain other parts the same
+     */
+    public static String stream2taskName(String old) {
+        String[] parts = old.split(DELIM);
+        if (parts.length >= 7) {
+            parts[0] = MetaType.TASK.getV() + parts[0].charAt(1);
+            parts[parts.length - 3] = EMPTY;
+        }
+        return concat(parts);
+    }
+
+    /**
+     * make taskId=0 and streamId empty.
+     */
+    public static String task2compName(String old) {
+        String[] parts = old.split(DELIM);
+        if (parts.length >= 7) {
+            parts[0] = MetaType.COMPONENT.getV() + parts[0].charAt(1);
+            parts[parts.length - 3] = EMPTY;
+            parts[parts.length - 4] = "0";
+        }
+        return concat(parts);
+    }
+
+    /**
+     * make taskId=0 and streamId empty and metricName remain the string after 
`.`.
+     */
+    public static String task2MergeCompName(String old) {
+        String[] parts = old.split(DELIM);
+        if (parts.length >= 7) {
+            parts[0] = MetaType.COMPONENT.getV() + parts[0].charAt(1);
+            parts[parts.length - 3] = EMPTY;
+            parts[parts.length - 4] = "0";
+
+            String metricName = parts[parts.length - 1];
+            int dotIndex = metricName.indexOf(".");
+            if (dotIndex != -1){
+                metricName = metricName.substring(dotIndex+1);
+                parts[parts.length - 1] = metricName;
+            }
+        }
+        return concat(parts);
+    }
+
+    /**
+     * change component metric name to worker metric name, only for topology 
metrics
+     */
+    public static String comp2topologyName(String old) {
+        String[] parts = old.split(DELIM);
+        parts[0] = MetaType.TOPOLOGY.getV() + parts[0].charAt(1);
+        // type + topologyId + host + port + group + name
+        return concat(parts[0], parts[1], EMPTY, "0", parts[5], parts[6]);
+    }
+
+    public static String worker2topologyName(String old) {
+        String[] parts = old.split(DELIM);
+        if (parts.length >= 5) {
+            parts[0] = MetaType.TOPOLOGY.getV() + parts[0].charAt(1);
+            parts[2] = EMPTY;   // host
+            parts[3] = "0";     // port
+        }
+        return concat(parts);
+    }
+
+    public static String topo2clusterName(String old){
+        String[] parts = old.split(DELIM);
+        parts[1] = JStormMetrics.CLUSTER_METRIC_KEY;
+        return concat(parts);
+    }
+
+    public static String concat(Object... args) {
+        StringBuilder sb = new StringBuilder(50);
+        int last = args.length - 1;
+        if (args[last] instanceof String) {
+            args[last] = removeDelimIfPossible((String) args[last]);
+        }
+        for (Object arg : args) {
+            sb.append(arg).append(DELIM);
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        return sb.toString();
+    }
+
+    public static String concat2(Object... args) {
+        StringBuilder sb = new StringBuilder(50);
+        for (Object arg : args) {
+            sb.append(arg).append(DELIM);
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        return sb.toString();
+    }
+
+    public static String concat3(String delim, Object... args) {
+        StringBuilder sb = new StringBuilder(50);
+        for (Object arg : args) {
+            sb.append(arg).append(delim);
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        return sb.toString();
+    }
+
+    public static Histogram metricSnapshot2Histogram(MetricSnapshot snapshot) {
+        Histogram histogram = new Histogram(new 
ExponentiallyDecayingReservoir());
+        List<Long> points = snapshot.get_points();
+        updateHistogramPoints(histogram, points);
+        return histogram;
+    }
+
+    public static Timer metricSnapshot2Timer(MetricSnapshot snapshot) {
+        Timer timer = new Timer(new ExponentiallyDecayingReservoir());
+        List<Long> points = snapshot.get_points();
+        updateTimerPoints(timer, points);
+        return timer;
+    }
+
+    public static void updateHistogramPoints(Histogram histogram, List<Long> 
points) {
+        if (points != null) {
+            for (Long pt : points) {
+                histogram.update(pt);
+            }
+        }
+    }
+
+    public static void updateTimerPoints(Timer timer, List<Long> points) {
+        if (points != null) {
+            for (Long pt : points) {
+                timer.update(pt, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    public static Map<Integer, MetricSnapshot> 
toThriftCounterSnapshots(Map<Integer, AsmSnapshot> snapshots) {
+        Map<Integer, MetricSnapshot> ret = 
Maps.newHashMapWithExpectedSize(snapshots.size());
+        for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) {
+            ret.put(entry.getKey(), convert((AsmCounterSnapshot) 
entry.getValue()));
+        }
+        return ret;
+    }
+
+    public static Map<Integer, MetricSnapshot> 
toThriftGaugeSnapshots(Map<Integer, AsmSnapshot> snapshots) {
+        Map<Integer, MetricSnapshot> ret = 
Maps.newHashMapWithExpectedSize(snapshots.size());
+        for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) {
+            ret.put(entry.getKey(), convert((AsmGaugeSnapshot) 
entry.getValue()));
+        }
+        return ret;
+    }
+
+    public static Map<Integer, MetricSnapshot> 
toThriftMeterSnapshots(Map<Integer, AsmSnapshot> snapshots) {
+        Map<Integer, MetricSnapshot> ret = 
Maps.newHashMapWithExpectedSize(snapshots.size());
+        for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) {
+            ret.put(entry.getKey(), convert((AsmMeterSnapshot) 
entry.getValue()));
+        }
+        return ret;
+    }
+
+    public static Map<Integer, MetricSnapshot> toThriftHistoSnapshots(MetaType 
metaType, Map<Integer, AsmSnapshot> snapshots) {
+        Map<Integer, MetricSnapshot> ret = 
Maps.newHashMapWithExpectedSize(snapshots.size());
+        for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) {
+            MetricSnapshot histogramSnapshot = convert(metaType, 
(AsmHistogramSnapshot) entry.getValue());
+            if (histogramSnapshot != null) {
+                ret.put(entry.getKey(), histogramSnapshot);
+            }
+        }
+        return ret;
+    }
+
+    public static Map<Integer, MetricSnapshot> toThriftTimerSnapshots(MetaType 
metaType, Map<Integer, AsmSnapshot> snapshots) {
+        Map<Integer, MetricSnapshot> ret = 
Maps.newHashMapWithExpectedSize(snapshots.size());
+        for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) {
+            MetricSnapshot timerSnapshot = convert(metaType, 
(AsmTimerSnapshot) entry.getValue());
+            if (timerSnapshot != null) {
+                ret.put(entry.getKey(), timerSnapshot);
+            }
+        }
+        return ret;
+    }
+
+    public static MetricSnapshot convert(AsmCounterSnapshot snapshot) {
+        MetricSnapshot ret = new MetricSnapshot();
+        ret.set_metricId(snapshot.getMetricId());
+        ret.set_ts(TimeUtils.alignTimeToMin(snapshot.getTs()));
+        ret.set_metricType(MetricType.COUNTER.getT());
+        ret.set_longValue(snapshot.getV());
+
+        return ret;
+    }
+
+    public static MetricSnapshot convert(AsmGaugeSnapshot snapshot) {
+        MetricSnapshot ret = new MetricSnapshot();
+        ret.set_metricId(snapshot.getMetricId());
+        ret.set_ts(TimeUtils.alignTimeToMin(snapshot.getTs()));
+        ret.set_metricType(MetricType.GAUGE.getT());
+        ret.set_doubleValue(snapshot.getV());
+
+        return ret;
+    }
+
+    public static MetricSnapshot convert(AsmMeterSnapshot snapshot) {
+        MetricSnapshot ret = new MetricSnapshot();
+        ret.set_metricId(snapshot.getMetricId());
+        ret.set_ts(TimeUtils.alignTimeToMin(snapshot.getTs()));
+        ret.set_metricType(MetricType.METER.getT());
+
+        ret.set_m1(snapshot.getM1());
+        ret.set_m5(snapshot.getM5());
+        ret.set_m15(snapshot.getM15());
+        ret.set_mean(snapshot.getMean());
+
+        return ret;
+    }
+
+    public static MetricSnapshot convert(MetaType metaType, 
AsmHistogramSnapshot snapshot) {
+        // some histograms are never updated, skip such metrics
+        //if (snapshot.getSnapshot().getValues().length == 0) {
+        //    return null;
+        //}
+
+        MetricSnapshot ret = new MetricSnapshot();
+        ret.set_metricId(snapshot.getMetricId());
+        ret.set_ts(TimeUtils.alignTimeToMin(snapshot.getTs()));
+        ret.set_metricType(MetricType.HISTOGRAM.getT());
+
+        Snapshot ws = snapshot.getSnapshot();
+        ret.set_min(ws.getMin());
+        ret.set_max(ws.getMax());
+        ret.set_p50(ws.getMedian());
+        ret.set_p75(ws.get75thPercentile());
+        ret.set_p95(ws.get95thPercentile());
+        ret.set_p98(ws.get98thPercentile());
+        ret.set_p99(ws.get99thPercentile());
+        ret.set_p999(ws.get999thPercentile());
+        ret.set_mean(ws.getMean());
+        ret.set_stddev(ws.getStdDev());
+
+        // only upload points for component metrics
+        if (metaType == MetaType.COMPONENT || metaType == MetaType.TOPOLOGY) {
+            List<Long> pts = 
Lists.newArrayListWithCapacity(ws.getValues().length);
+            for (Long pt : ws.getValues()) {
+                pts.add(pt);
+            }
+            ret.set_points(pts);
+        } else {
+            ret.set_points(new ArrayList<Long>(0));
+        }
+
+        return ret;
+    }
+
+    public static MetricSnapshot convert(MetaType metaType, AsmTimerSnapshot 
snapshot) {
+        // some histograms are never updated, skip such metrics
+        /*
+        if (snapshot.getHistogram().getValues().length == 0) {
+            return null;
+        }
+        */
+
+        MetricSnapshot ret = new MetricSnapshot();
+        ret.set_metricId(snapshot.getMetricId());
+        ret.set_ts(TimeUtils.alignTimeToMin(snapshot.getTs()));
+        ret.set_metricType(MetricType.TIMER.getT());
+
+        Snapshot ws = snapshot.getHistogram();
+        ret.set_min(ws.getMin());
+        ret.set_max(ws.getMax());
+        ret.set_p50(ws.getMedian());
+        ret.set_p75(ws.get75thPercentile());
+        ret.set_p95(ws.get95thPercentile());
+        ret.set_p98(ws.get98thPercentile());
+        ret.set_p99(ws.get99thPercentile());
+        ret.set_p999(ws.get999thPercentile());
+        ret.set_mean(ws.getMean());
+        ret.set_stddev(ws.getStdDev());
+
+        AsmMeterSnapshot ms = snapshot.getMeter();
+        ret.set_m1(ms.getM1());
+        ret.set_m15(ms.getM5());
+        ret.set_m15(ms.getM15());
+
+        // only upload points for component metrics
+        if (metaType == MetaType.COMPONENT || metaType == MetaType.TOPOLOGY) {
+            List<Long> pts = 
Lists.newArrayListWithCapacity(ws.getValues().length);
+            for (Long pt : ws.getValues()) {
+                pts.add(pt);
+            }
+            ret.set_points(pts);
+        } else {
+            ret.set_points(new ArrayList<Long>(0));
+        }
+
+        return ret;
+    }
+
+    public static String getMetricName(String fullName) {
+        String[] parts = fullName.split(DELIM);
+        return parts[parts.length - 1];
+    }
+
+    public static String str(Object obj) {
+        if (obj instanceof MetricSnapshot) {
+            MetricSnapshot snapshot = (MetricSnapshot) obj;
+            MetricType type = MetricType.parse(snapshot.get_metricType());
+            if (type == MetricType.COUNTER) {
+                return counterStr(snapshot);
+            } else if (type == MetricType.GAUGE) {
+                return gaugeStr(snapshot);
+            } else if (type == MetricType.METER) {
+                return meterStr(snapshot);
+            } else if (type == MetricType.HISTOGRAM) {
+                return histogramStr(snapshot);
+            } else if (type == MetricType.TIMER) {
+                return timerStr(snapshot);
+            }
+        }
+        return obj.toString();
+    }
+
+    public static String counterStr(MetricSnapshot snapshot) {
+        StringBuilder sb = new StringBuilder(32);
+        
sb.append("id:").append(snapshot.get_metricId()).append(",v:").append(snapshot.get_longValue());
+
+        return sb.toString();
+    }
+
+    public static String gaugeStr(MetricSnapshot snapshot) {
+        StringBuilder sb = new StringBuilder(32);
+        
sb.append("id:").append(snapshot.get_metricId()).append(",v:").append(snapshot.get_doubleValue());
+
+        return sb.toString();
+    }
+
+    public static String meterStr(MetricSnapshot snapshot) {
+        StringBuilder sb = new StringBuilder(50);
+        sb.append("id:").append(snapshot.get_metricId());
+        
sb.append(",m1:").append(snapshot.get_m1()).append(",").append("m5:").append(snapshot.get_m5())
+                .append(",").append("m15:").append(snapshot.get_m15());
+        return sb.toString();
+    }
+
+    public static String histogramStr(MetricSnapshot snapshot) {
+        StringBuilder sb = new StringBuilder(128);
+        sb.append("histogram");
+        
sb.append("(").append("id:").append(snapshot.get_metricId()).append(",").append("min:").append(snapshot.get_min()).append(",").append("max:")
+                
.append(snapshot.get_max()).append(",").append("mean:").append(snapshot.get_mean()).append(",").append("p50:").append(snapshot.get_p50())
+                
.append(",").append("p75:").append(snapshot.get_p75()).append(",").append("p95:").append(snapshot.get_p95()).append(",").append("p98:")
+                
.append(snapshot.get_p98()).append(",").append("p99:").append(snapshot.get_p99()).append(",").append("pts:").append(snapshot.get_points_size())
+                .append(")");
+        return sb.toString();
+    }
+
+    public static String timerStr(MetricSnapshot snapshot) {
+        StringBuilder sb = new StringBuilder(128);
+        sb.append("timer");
+        
sb.append("(").append("id:").append(snapshot.get_metricId()).append(",").append("min:").append(snapshot.get_min()).append(",").append("max:")
+                
.append(snapshot.get_max()).append(",").append("mean:").append(snapshot.get_mean()).append(",").append("p50:").append(snapshot.get_p50())
+                
.append(",").append("p75:").append(snapshot.get_p75()).append(",").append("p95:").append(snapshot.get_p95()).append(",").append("p98:")
+                
.append(snapshot.get_p98()).append(",").append("p99:").append(snapshot.get_p99()).append(",").append("m1:").append(snapshot.get_m1())
+                
.append(",").append("m5:").append(snapshot.get_m5()).append(",").append("m15:").append(snapshot.get_m15()).append(",").append("pts:")
+                .append(snapshot.get_points_size()).append(")");
+        return sb.toString();
+    }
+
+    public static void printMetricSnapshot(AsmMetric metric, Map<Integer, 
AsmSnapshot> snapshots) {
+        StringBuilder sb = new StringBuilder(128);
+        sb.append("metric:").append(metric.getMetricName()).append(", ");
+        for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) {
+            sb.append("win:").append(entry.getKey()).append(", v:")
+                    
.append(getSnapshotDefaultValue(entry.getValue())).append("; ");
+        }
+
+        LOG.info(sb.toString());
+    }
+
+    public static double getSnapshotDefaultValue(AsmSnapshot snapshot) {
+        if (snapshot instanceof AsmCounterSnapshot) {
+            return ((AsmCounterSnapshot) snapshot).getV();
+        } else if (snapshot instanceof AsmGaugeSnapshot) {
+            return ((AsmGaugeSnapshot) snapshot).getV();
+        } else if (snapshot instanceof AsmMeterSnapshot) {
+            return ((AsmMeterSnapshot) snapshot).getM1();
+        } else if (snapshot instanceof AsmHistogramSnapshot) {
+            return ((AsmHistogramSnapshot) snapshot).getSnapshot().getMean();
+        } else if (snapshot instanceof AsmTimerSnapshot) {
+            return ((AsmTimerSnapshot) snapshot).getHistogram().getMean();
+        }
+        return 0;
+    }
+
+    public static void printMetricInfo(MetricInfo metricInfo) {
+        iterateMap(metricInfo.get_metrics());
+    }
+
+    public static void printMetricInfo(MetricInfo metricInfo, Set<String> 
metrics) {
+        iterateMap(metricInfo.get_metrics(), metrics);
+    }
+
+    public static <T> void iterateMap(Map<String, Map<Integer, T>> map) {
+        iterateMap(map, null);
+    }
+
+    public static <T> void iterateMap(Map<String, Map<Integer, T>> map, 
Set<String> metrics) {
+        for (Map.Entry<String, Map<Integer, T>> entry : map.entrySet()) {
+            String name = entry.getKey();
+            boolean print = false;
+            if (metrics == null) {
+                print = true;
+            } else {
+                for (String metric : metrics) {
+                    if (name.contains(metric)) {
+                        print = true;
+                        break;
+                    }
+                }
+            }
+            if (print) {
+                Map<Integer, T> winData = entry.getValue();
+                for (Map.Entry<Integer, T> win : winData.entrySet()) {
+                    T v = win.getValue();
+                    String str;
+                    if (v instanceof MetricSnapshot) {
+                        str = MetricUtils.str(v);
+                    } else {
+                        str = v.toString();
+                    }
+                    LOG.info("metric:{}, win:{}, data:{}", name, win.getKey(), 
str);
+                }
+            }
+        }
+    }
+
+    private static <T> void iter(Map<String, T> map, Func func, Object... 
args) {
+        for (Map.Entry<String, T> entry : map.entrySet()) {
+            func.exec(entry, args);
+        }
+    }
+
+
+    public interface Func {
+        void exec(Object... args);
+    }
+
+
+    /**
+     * print default value for all metrics, in the format of: name|type|value
+     */
+    public static void logMetrics(MetricInfo metricInfo) {
+        Map<String, Map<Integer, MetricSnapshot>> metrics = 
metricInfo.get_metrics();
+        if (metrics != null) {
+            LOG.info("\nprint metrics:");
+            for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : 
metrics.entrySet()) {
+                String name = entry.getKey();
+                MetricSnapshot metricSnapshot = 
entry.getValue().get(AsmWindow.M1_WINDOW);
+                if (metricSnapshot != null) {
+                    MetricType metricType = 
MetricType.parse(metricSnapshot.get_metricType());
+                    double v;
+                    if (metricType == MetricType.COUNTER) {
+                        v = metricSnapshot.get_longValue();
+                    } else if (metricType == MetricType.GAUGE) {
+                        v = metricSnapshot.get_doubleValue();
+                    } else if (metricType == MetricType.METER) {
+                        v = metricSnapshot.get_m1();
+                    } else if (metricType == MetricType.HISTOGRAM) {
+                        v = metricSnapshot.get_mean();
+                    } else if (metricType == MetricType.TIMER) {
+                        v = metricSnapshot.get_mean();
+                    } else {
+                        v = 0;
+                    }
+                    LOG.info("{}|{}|{}", metricType, v, name);
+                }
+            }
+            LOG.info("\n");
+        }
+    }
+
+    public static void main(String[] args) {
+        String workerName = "WC@[email protected]@6800@sys@Counter";
+        System.out.println(worker2topologyName(workerName));
+    }
+}

Reply via email to