http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java
index 513e83f..b60864a 100644
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java
@@ -18,95 +18,54 @@
  */
 package com.alibaba.jstorm.daemon.nimbus;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.daemon.Shutdownable;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.ComponentSummary;
-import backtype.storm.generated.Credentials;
-import backtype.storm.generated.ErrorInfo;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.MetricInfo;
-import backtype.storm.generated.MonitorOptions;
-import backtype.storm.generated.NettyMetric;
+import backtype.storm.generated.*;
 import backtype.storm.generated.Nimbus.Iface;
-import backtype.storm.generated.NimbusSummary;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.SupervisorSummary;
-import backtype.storm.generated.SupervisorWorkers;
-import backtype.storm.generated.TaskComponent;
-import backtype.storm.generated.TaskSummary;
-import backtype.storm.generated.TopologyAssignException;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.generated.TopologyInitialStatus;
-import backtype.storm.generated.TopologyMetric;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.generated.WorkerSummary;
-import backtype.storm.generated.WorkerUploadMetrics;
 import backtype.storm.utils.BufferFileInputStream;
 import backtype.storm.utils.TimeCacheMap;
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.callback.impl.RemoveTransitionCallback;
-import com.alibaba.jstorm.cluster.Cluster;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.cluster.DaemonCommon;
-import com.alibaba.jstorm.cluster.StormBase;
-import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.cluster.*;
 import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
-import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricUtils;
 import com.alibaba.jstorm.metric.SimpleJStormMetric;
 import com.alibaba.jstorm.schedule.Assignment;
 import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
 import com.alibaba.jstorm.task.TaskInfo;
 import com.alibaba.jstorm.task.error.TaskError;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
-import com.alibaba.jstorm.utils.FailedAssignTopologyException;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.NetWorkUtils;
-import com.alibaba.jstorm.utils.PathUtils;
-import com.alibaba.jstorm.utils.Thrift;
-import com.alibaba.jstorm.utils.TimeUtils;
+import com.alibaba.jstorm.utils.*;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileExistsException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.security.InvalidParameterException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.*;
 
 /**
  * Thrift callback, all commands handling entrance
- * 
+ *
  * @author version 1: lixin, version 2:Longda
- * 
  */
 public class ServiceHandler implements Iface, Shutdownable, DaemonCommon {
-    private final static Logger LOG = LoggerFactory
-            .getLogger(ServiceHandler.class);
+    private final static Logger LOG = 
LoggerFactory.getLogger(ServiceHandler.class);
 
     public final static int THREAD_NUM = 64;
 
@@ -138,24 +97,18 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
     }
 
     @Override
-    public void submitTopology(String name, String uploadedJarLocation,
-            String jsonConf, StormTopology topology)
-            throws AlreadyAliveException, InvalidTopologyException,
-            TopologyAssignException, TException {
+    public void submitTopology(String name, String uploadedJarLocation, String 
jsonConf, StormTopology topology) throws TException, AlreadyAliveException,
+            InvalidTopologyException, TopologyAssignException {
         SubmitOptions options = new 
SubmitOptions(TopologyInitialStatus.ACTIVE);
-
-        submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology,
-                options);
+        submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, 
options);
     }
 
-    private void makeAssignment(String topologyName, String topologyId,
-            TopologyInitialStatus status) throws FailedAssignTopologyException 
{
+    private void makeAssignment(String topologyName, String topologyId, 
TopologyInitialStatus status) throws FailedAssignTopologyException {
         TopologyAssignEvent assignEvent = new TopologyAssignEvent();
         assignEvent.setTopologyId(topologyId);
         assignEvent.setScratch(false);
         assignEvent.setTopologyName(topologyName);
-        assignEvent.setOldStatus(Thrift
-                .topologyInitialStatusToStormStatus(status));
+        
assignEvent.setOldStatus(Thrift.topologyInitialStatusToStormStatus(status));
 
         TopologyAssign.push(assignEvent);
 
@@ -169,86 +122,100 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
     /**
      * Submit one Topology
-     * 
-     * @param topologyname String: topology name
+     *
+     * @param topologyName        String: topology name
      * @param uploadedJarLocation String: already uploaded jar path
-     * @param jsonConf String: jsonConf serialize all toplogy configuration to
-     *            Json
-     * @param topology StormTopology: topology Object
+     * @param jsonConf            String: jsonConf serialize all toplogy 
configuration to
+     *                            Json
+     * @param topology            StormTopology: topology Object
      */
     @SuppressWarnings("unchecked")
     @Override
-    public void submitTopologyWithOpts(String topologyname,
-            String uploadedJarLocation, String jsonConf,
-            StormTopology topology, SubmitOptions options)
-            throws AlreadyAliveException, InvalidTopologyException,
-            TopologyAssignException, TException {
-        LOG.info("Receive " + topologyname + ", uploadedJarLocation:"
-                + uploadedJarLocation);
+    public void submitTopologyWithOpts(String topologyName, String 
uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions 
options)
+            throws AlreadyAliveException, InvalidTopologyException, 
TopologyAssignException, TException {
+        LOG.info("Receive " + topologyName + ", uploadedJarLocation:" + 
uploadedJarLocation);
         long start = System.nanoTime();
+
+        //check topologyname is valid
+        if (!Common.charValidate(topologyName)) {
+            throw new InvalidTopologyException(topologyName + " is not a valid 
topology name");
+        }
+
         try {
-            checkTopologyActive(data, topologyname, false);
+            checkTopologyActive(data, topologyName, false);
         } catch (AlreadyAliveException e) {
-            LOG.info(topologyname + " is already exist ");
+            LOG.info(topologyName + " already exists ");
             throw e;
         } catch (Throwable e) {
             LOG.info("Failed to check whether topology is alive or not", e);
             throw new TException(e);
         }
 
-        int counter = data.getSubmittedCount().incrementAndGet();
-        String topologyId = Common.topologyNameToId(topologyname, counter);
-        data.getPendingSubmitTopoloygs().put(topologyId, null);
-
+        String topologyId = null;
+        synchronized (data) {
+            // avoid to the same topologys wered submmitted at the same time
+            Set<String> pendingTopologys =
+                    data.getPendingSubmitTopoloygs().keySet();
+            for (String cachTopologyId : pendingTopologys) {
+                if (cachTopologyId.contains(topologyName + "-"))
+                    throw new AlreadyAliveException(
+                            topologyName + "  were submitted");
+            }
+            int counter = data.getSubmittedCount().incrementAndGet();
+            topologyId = Common.topologyNameToId(topologyName, counter);
+            data.getPendingSubmitTopoloygs().put(topologyId, null);
+        }
         try {
 
-            Map<Object, Object> serializedConf =
-                    (Map<Object, Object>) JStormUtils.from_json(jsonConf);
+            Map<Object, Object> serializedConf = (Map<Object, Object>) 
JStormUtils.from_json(jsonConf);
             if (serializedConf == null) {
                 LOG.warn("Failed to serialized Configuration");
-                throw new InvalidTopologyException(
-                        "Failed to serilaze topology configuration");
+                throw new InvalidTopologyException("Failed to serialize 
topology configuration");
             }
 
             serializedConf.put(Config.TOPOLOGY_ID, topologyId);
-            serializedConf.put(Config.TOPOLOGY_NAME, topologyname);
+            serializedConf.put(Config.TOPOLOGY_NAME, topologyName);
 
             Map<Object, Object> stormConf;
 
-            stormConf =
-                    NimbusUtils.normalizeConf(conf, serializedConf, topology);
+            stormConf = NimbusUtils.normalizeConf(conf, serializedConf, 
topology);
             LOG.info("Normalized configuration:" + stormConf);
-            data.getTopologyNettyMgr().setTopology(stormConf);
 
-            Map<Object, Object> totalStormConf =
-                    new HashMap<Object, Object>(conf);
+            Map<Object, Object> totalStormConf = new HashMap<Object, 
Object>(conf);
             totalStormConf.putAll(stormConf);
 
-            StormTopology normalizedTopology =
-                    NimbusUtils.normalizeTopology(stormConf, topology, true);
+            StormTopology normalizedTopology = 
NimbusUtils.normalizeTopology(stormConf, topology, true);
 
             // this validates the structure of the topology
-            Common.validate_basic(normalizedTopology, totalStormConf,
-                    topologyId);
-            
+            Common.validate_basic(normalizedTopology, totalStormConf, 
topologyId);
             // don't need generate real topology, so skip 
Common.system_topology
             // Common.system_topology(totalStormConf, topology);
 
             StormClusterState stormClusterState = data.getStormClusterState();
 
+            double metricsSampleRate = 
ConfigExtension.getMetricSampleRate(stormConf);
             // create /local-dir/nimbus/topologyId/xxxx files
-            setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
-                    normalizedTopology);
+            setupStormCode(conf, topologyId, uploadedJarLocation, stormConf, 
normalizedTopology);
 
             // generate TaskInfo for every bolt or spout in ZK
             // /ZK/tasks/topoologyId/xxx
             setupZkTaskInfo(conf, topologyId, stormClusterState);
 
             // make assignments for a topology
-            LOG.info("Submit for " + topologyname + " with conf "
-                    + serializedConf);
-            makeAssignment(topologyname, topologyId,
-                    options.get_initial_status());
+            LOG.info("Submit for " + topologyName + " with conf " + 
serializedConf);
+            makeAssignment(topologyName, topologyId, 
options.get_initial_status());
+
+            // when make assignment for a topology,so remove the topologyid 
form
+            // pendingSubmitTopologys
+            data.getPendingSubmitTopoloygs().remove(topologyId);
+
+            // push start event after startup
+            StartTopologyEvent startEvent = new StartTopologyEvent();
+            startEvent.clusterName = this.data.getClusterName();
+            startEvent.topologyId = topologyId;
+            startEvent.timestamp = System.currentTimeMillis();
+            startEvent.sampleRate = metricsSampleRate;
+            this.data.getMetricRunnable().pushEvent(startEvent);
 
         } catch (FailedAssignTopologyException e) {
             StringBuilder sb = new StringBuilder();
@@ -291,28 +258,27 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
             LOG.error(sb.toString(), e);
             data.getPendingSubmitTopoloygs().remove(topologyId);
             throw new TopologyAssignException(sb.toString());
-        }finally {
-            double spend = (System.nanoTime() - start)/1000000.0d;
-            SimpleJStormMetric.updateHistorgram("submitTopologyWithOpts", 
spend);
-            LOG.info("submitTopologyWithOpts {} spend {}ms", topologyname, 
spend);
+        } finally {
+            double spend = (System.nanoTime() - start) / TimeUtils.NS_PER_US;
+            SimpleJStormMetric.updateNimbusHistogram("submitTopologyWithOpts", 
spend);
+            LOG.info("submitTopologyWithOpts {} costs {}ms", topologyName, 
spend);
         }
 
     }
 
     /**
      * kill topology
-     * 
-     * @param topologyname String topology name
+     *
+     * @param topologyName String topology name
      */
     @Override
-    public void killTopology(String name) throws NotAliveException, TException 
{
-        killTopologyWithOpts(name, new KillOptions());
+    public void killTopology(String topologyName) throws TException, 
NotAliveException {
+        killTopologyWithOpts(topologyName, new KillOptions());
 
     }
 
     @Override
-    public void killTopologyWithOpts(String topologyName, KillOptions options)
-            throws NotAliveException, TException {
+    public void killTopologyWithOpts(String topologyName, KillOptions options) 
throws TException, NotAliveException {
         try {
             checkTopologyActive(data, topologyName, true);
 
@@ -322,17 +288,13 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
             if (options.is_set_wait_secs()) {
                 wait_amt = options.get_wait_secs();
             }
-            NimbusUtils.transitionName(data, topologyName, true,
-                    StatusType.kill, wait_amt);
+            NimbusUtils.transitionName(data, topologyName, true, 
StatusType.kill, wait_amt);
 
-            TopologyMetricsRunnable.Remove event =
-                    new TopologyMetricsRunnable.Remove();
+            Remove event = new Remove();
             event.topologyId = topologyId;
-
             data.getMetricRunnable().pushEvent(event);
         } catch (NotAliveException e) {
-            String errMsg =
-                    "KillTopology Error, no this topology " + topologyName;
+            String errMsg = "KillTopology Error, no this topology " + 
topologyName;
             LOG.error(errMsg, e);
             throw new NotAliveException(errMsg);
         } catch (Exception e) {
@@ -345,16 +307,13 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
     /**
      * set topology status as active
-     * 
-     * @param topologyname
-     * 
+     *
+     * @param topologyName
      */
     @Override
-    public void activate(String topologyName) throws NotAliveException,
-            TException {
+    public void activate(String topologyName) throws TException, 
NotAliveException {
         try {
-            NimbusUtils.transitionName(data, topologyName, true,
-                    StatusType.activate);
+            NimbusUtils.transitionName(data, topologyName, true, 
StatusType.activate);
         } catch (NotAliveException e) {
             String errMsg = "Activate Error, no this topology " + topologyName;
             LOG.error(errMsg, e);
@@ -369,20 +328,15 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
     /**
      * set topology stauts as deactive
-     * 
-     * @param topologyname
-     * 
+     *
+     * @param topologyName
      */
     @Override
-    public void deactivate(String topologyName) throws NotAliveException,
-            TException {
-
+    public void deactivate(String topologyName) throws TException, 
NotAliveException {
         try {
-            NimbusUtils.transitionName(data, topologyName, true,
-                    StatusType.inactivate);
+            NimbusUtils.transitionName(data, topologyName, true, 
StatusType.inactivate);
         } catch (NotAliveException e) {
-            String errMsg =
-                    "Deactivate Error, no this topology " + topologyName;
+            String errMsg = "Deactivate Error, no this topology " + 
topologyName;
             LOG.error(errMsg, e);
             throw new NotAliveException(errMsg);
         } catch (Exception e) {
@@ -395,20 +349,16 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
     /**
      * rebalance one topology
-     * 
+     *
+     * @param topologyName topology name
+     * @param options      RebalanceOptions
      * @@@ rebalance options hasn't implements
-     * 
-     *     It is used to let workers wait several seconds to finish jobs
-     * 
-     * @param topologyname String
-     * @param options RebalanceOptions
+     * <p/>
+     * It is used to let workers wait several seconds to finish jobs
      */
     @Override
-    public void rebalance(String topologyName, RebalanceOptions options)
-            throws NotAliveException, TException, InvalidTopologyException {
-
+    public void rebalance(String topologyName, RebalanceOptions options) 
throws TException, NotAliveException {
         try {
-
             checkTopologyActive(data, topologyName, true);
             Integer wait_amt = null;
             String jsonConf = null;
@@ -422,15 +372,11 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
                     jsonConf = options.get_conf();
             }
 
-            LOG.info("Begin to rebalance " + topologyName + "wait_time:"
-                    + wait_amt + ", reassign: " + reassign
-                    + ", new worker/bolt configuration:" + jsonConf);
+            LOG.info("Begin to rebalance " + topologyName + "wait_time:" + 
wait_amt + ", reassign: " + reassign + ", new worker/bolt configuration:" + 
jsonConf);
 
-            Map<Object, Object> conf =
-                    (Map<Object, Object>) JStormUtils.from_json(jsonConf);
+            Map<Object, Object> conf = (Map<Object, Object>) 
JStormUtils.from_json(jsonConf);
 
-            NimbusUtils.transitionName(data, topologyName, true,
-                    StatusType.rebalance, wait_amt, reassign, conf);
+            NimbusUtils.transitionName(data, topologyName, true, 
StatusType.rebalance, wait_amt, reassign, conf);
         } catch (NotAliveException e) {
             String errMsg = "Rebalance Error, no this topology " + 
topologyName;
             LOG.error(errMsg, e);
@@ -444,13 +390,12 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
     }
 
     @Override
-    public void restart(String name, String jsonConf) throws NotAliveException,
-            InvalidTopologyException, TopologyAssignException, TException {
+    public void restart(String name, String jsonConf) throws TException, 
NotAliveException, InvalidTopologyException, TopologyAssignException {
         LOG.info("Begin to restart " + name + ", new configuration:" + 
jsonConf);
 
         // 1. get topologyId
         StormClusterState stormClusterState = data.getStormClusterState();
-        String topologyId = null;
+        String topologyId;
         try {
             topologyId = Cluster.get_topology_id(stormClusterState, name);
         } catch (Exception e2) {
@@ -468,30 +413,26 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
         LOG.info("Deactivate " + name);
 
         // 3. backup old jar/configuration/topology
-        StormTopology topology = null;
-
-        Map topologyConf = null;
+        StormTopology topology;
+        Map topologyConf;
         String topologyCodeLocation = null;
         try {
             topology = StormConfig.read_nimbus_topology_code(conf, topologyId);
-
-            topologyConf =
-                    StormConfig.read_nimbus_topology_conf(conf, topologyId);
+            topologyConf = StormConfig.read_nimbus_topology_conf(conf, 
topologyId);
             if (jsonConf != null) {
-                Map<Object, Object> newConf =
-                        (Map<Object, Object>) JStormUtils.from_json(jsonConf);
+                Map<Object, Object> newConf = (Map<Object, Object>) 
JStormUtils.from_json(jsonConf);
                 topologyConf.putAll(newConf);
             }
 
             // Copy storm files back to stormdist dir from the tmp dir
-            String oldDistDir =
-                    StormConfig.masterStormdistRoot(conf, topologyId);
+            String oldDistDir = StormConfig.masterStormdistRoot(conf, 
topologyId);
             String parent = StormConfig.masterInbox(conf);
             topologyCodeLocation = parent + PathUtils.SEPERATOR + topologyId;
             FileUtils.forceMkdir(new File(topologyCodeLocation));
             FileUtils.cleanDirectory(new File(topologyCodeLocation));
-            FileUtils.copyDirectory(new File(oldDistDir), new File(
-                    topologyCodeLocation));
+            File stormDistDir = new File(oldDistDir);
+            stormDistDir.setLastModified(System.currentTimeMillis());
+            FileUtils.copyDirectory(stormDistDir, new 
File(topologyCodeLocation));
 
             LOG.info("Successfully read old jar/conf/topology " + name);
         } catch (Exception e) {
@@ -499,8 +440,7 @@ public class ServiceHandler implements Iface, Shutdownable, 
DaemonCommon {
             if (topologyCodeLocation != null) {
                 try {
                     PathUtils.rmr(topologyCodeLocation);
-                } catch (IOException e1) {
-
+                } catch (IOException ignored) {
                 }
             }
             throw new TException("Failed to read old jar/conf/topology ");
@@ -509,24 +449,31 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
         // 4. Kill
         // directly use remove command to kill, more stable than issue kill cmd
-        RemoveTransitionCallback killCb =
-                new RemoveTransitionCallback(data, topologyId);
+        RemoveTransitionCallback killCb = new RemoveTransitionCallback(data, 
topologyId);
         killCb.execute(new Object[0]);
         LOG.info("Successfully kill the topology " + name);
 
+        // send metric events
+        TopologyMetricsRunnable.KillTopologyEvent killEvent = new 
TopologyMetricsRunnable.KillTopologyEvent();
+        killEvent.clusterName = this.data.getClusterName();
+        killEvent.topologyId = topologyId;
+        killEvent.timestamp = System.currentTimeMillis();
+        this.data.getMetricRunnable().pushEvent(killEvent);
+
+        Remove removeEvent = new Remove();
+        removeEvent.topologyId = topologyId;
+        this.data.getMetricRunnable().pushEvent(removeEvent);
+
         // 5. submit
         try {
-            submitTopology(name, topologyCodeLocation,
-                    JStormUtils.to_json(topologyConf), topology);
-
+            submitTopology(name, topologyCodeLocation, 
JStormUtils.to_json(topologyConf), topology);
         } catch (AlreadyAliveException e) {
             LOG.info("Failed to kill the topology" + name);
             throw new TException("Failed to kill the topology" + name);
         } finally {
             try {
                 PathUtils.rmr(topologyCodeLocation);
-            } catch (IOException e1) {
-
+            } catch (IOException ignored) {
             }
         }
 
@@ -537,11 +484,9 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
         try {
             String parent = PathUtils.parent_path(libName);
             PathUtils.local_mkdirs(parent);
-            data.getUploaders().put(libName,
-                    Channels.newChannel(new FileOutputStream(libName)));
+            data.getUploaders().put(libName, Channels.newChannel(new 
FileOutputStream(libName)));
             LOG.info("Begin upload file from client to " + libName);
         } catch (Exception e) {
-            // TODO Auto-generated catch block
             LOG.error("Fail to upload jar " + libName, e);
             throw new TException(e);
         }
@@ -549,23 +494,20 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
     /**
      * prepare to uploading topology jar, return the file location
-     * 
-     * @throws
      */
     @Override
     public String beginFileUpload() throws TException {
 
         String fileLoc = null;
         try {
-            String path = null;
+            String path;
             String key = UUID.randomUUID().toString();
             path = StormConfig.masterInbox(conf) + "/" + key;
             FileUtils.forceMkdir(new File(path));
             FileUtils.cleanDirectory(new File(path));
             fileLoc = path + "/stormjar-" + key + ".jar";
 
-            data.getUploaders().put(fileLoc,
-                    Channels.newChannel(new FileOutputStream(fileLoc)));
+            data.getUploaders().put(fileLoc, Channels.newChannel(new 
FileOutputStream(fileLoc)));
             LOG.info("Begin upload file from client to " + fileLoc);
             return path;
         } catch (FileNotFoundException e) {
@@ -581,14 +523,11 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
      * uploading topology jar data
      */
     @Override
-    public void uploadChunk(String location, ByteBuffer chunk)
-            throws TException {
+    public void uploadChunk(String location, ByteBuffer chunk) throws 
TException {
         TimeCacheMap<Object, Object> uploaders = data.getUploaders();
         Object obj = uploaders.get(location);
         if (obj == null) {
-            throw new TException(
-                    "File for that location does not exist (or timed out) "
-                            + location);
+            throw new TException("File for that location does not exist (or 
timed out) " + location);
         }
         try {
             if (obj instanceof WritableByteChannel) {
@@ -596,13 +535,10 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
                 channel.write(chunk);
                 uploaders.put(location, channel);
             } else {
-                throw new TException("Object isn't WritableByteChannel for "
-                        + location);
+                throw new TException("Object isn't WritableByteChannel for " + 
location);
             }
         } catch (IOException e) {
-            String errMsg =
-                    " WritableByteChannel write filed when uploadChunk "
-                            + location;
+            String errMsg = " WritableByteChannel write filed when uploadChunk 
" + location;
             LOG.error(errMsg);
             throw new TException(e);
         }
@@ -611,12 +547,10 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
     @Override
     public void finishFileUpload(String location) throws TException {
-
         TimeCacheMap<Object, Object> uploaders = data.getUploaders();
         Object obj = uploaders.get(location);
         if (obj == null) {
-            throw new TException(
-                    "File for that location does not exist (or timed out)");
+            throw new TException("File for that location does not exist (or 
timed out)");
         }
         try {
             if (obj instanceof WritableByteChannel) {
@@ -625,25 +559,20 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
                 uploaders.remove(location);
                 LOG.info("Finished uploading file from client: " + location);
             } else {
-                throw new TException("Object isn't WritableByteChannel for "
-                        + location);
+                throw new TException("Object isn't WritableByteChannel for " + 
location);
             }
         } catch (IOException e) {
-            LOG.error(" WritableByteChannel close failed when finishFileUpload 
"
-                    + location);
+            LOG.error(" WritableByteChannel close failed when finishFileUpload 
" + location);
         }
 
     }
 
     @Override
     public String beginFileDownload(String file) throws TException {
-        BufferFileInputStream is = null;
-        String id = null;
+        BufferFileInputStream is;
+        String id;
         try {
-            int bufferSize =
-                    JStormUtils.parseInt(
-                            conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE),
-                            1024 * 1024) / 2;
+            int bufferSize = 
JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE), 1024 * 
1024) / 2;
 
             is = new BufferFileInputStream(file, bufferSize);
             id = UUID.randomUUID().toString();
@@ -670,16 +599,14 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
                 BufferFileInputStream is = (BufferFileInputStream) obj;
                 byte[] ret = is.read();
                 if (ret != null) {
-                    downloaders.put(id, (BufferFileInputStream) is);
+                    downloaders.put(id, is);
                     return ByteBuffer.wrap(ret);
                 }
             } else {
-                throw new TException("Object isn't BufferFileInputStream for "
-                        + id);
+                throw new TException("Object isn't BufferFileInputStream for " 
+ id);
             }
         } catch (IOException e) {
-            LOG.error("BufferFileInputStream read failed when downloadChunk ",
-                    e);
+            LOG.error("BufferFileInputStream read failed when downloadChunk ", 
e);
             throw new TException(e);
         }
         byte[] empty = {};
@@ -692,72 +619,49 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
     }
 
     /**
-     * get cluster's summary, it will contain SupervisorSummary and
-     * TopologySummary
-     * 
+     * get cluster's summary, it will contain SupervisorSummary and 
TopologySummary
+     *
      * @return ClusterSummary
      */
     @Override
     public ClusterSummary getClusterInfo() throws TException {
         long start = System.nanoTime();
         try {
-
             StormClusterState stormClusterState = data.getStormClusterState();
 
-            Map<String, Assignment> assignments =
-                    new HashMap<String, Assignment>();
+            Map<String, Assignment> assignments = new HashMap<String, 
Assignment>();
 
             // get TopologySummary
-            List<TopologySummary> topologySummaries =
-                    NimbusUtils.getTopologySummary(stormClusterState,
-                            assignments);
+            List<TopologySummary> topologySummaries = 
NimbusUtils.getTopologySummary(stormClusterState, assignments);
 
             // all supervisors
-            Map<String, SupervisorInfo> supervisorInfos =
-                    Cluster.get_all_SupervisorInfo(stormClusterState, null);
+            Map<String, SupervisorInfo> supervisorInfos = 
Cluster.get_all_SupervisorInfo(stormClusterState, null);
 
             // generate SupervisorSummaries
-            List<SupervisorSummary> supervisorSummaries =
-                    NimbusUtils.mkSupervisorSummaries(supervisorInfos,
-                            assignments);
-
-            NimbusSummary nimbusSummary =
-                    NimbusUtils.getNimbusSummary(stormClusterState,
-                            supervisorSummaries, data);
+            List<SupervisorSummary> supervisorSummaries = 
NimbusUtils.mkSupervisorSummaries(supervisorInfos, assignments);
 
-            ClusterSummary ret =
-                    new ClusterSummary(nimbusSummary, supervisorSummaries,
-                            topologySummaries);
-
-            return ret;
+            NimbusSummary nimbusSummary = 
NimbusUtils.getNimbusSummary(stormClusterState, supervisorSummaries, data);
 
+            return new ClusterSummary(nimbusSummary, supervisorSummaries, 
topologySummaries);
         } catch (TException e) {
             LOG.info("Failed to get ClusterSummary ", e);
             throw e;
         } catch (Exception e) {
             LOG.info("Failed to get ClusterSummary ", e);
             throw new TException(e);
-        }finally {
-            double spend = (System.nanoTime() - start)/1000000.0d;
-            SimpleJStormMetric.updateHistorgram("getClusterInfo", spend);
-            LOG.info("getClusterInfo spend {}ms", spend);
+        } finally {
+            long end = System.nanoTime();
+            SimpleJStormMetric.updateNimbusHistogram("getClusterInfo", (end - 
start) / TimeUtils.NS_PER_US);
         }
     }
 
     @Override
     public String getVersion() throws TException {
-       try {
-            return Utils.getVersion();
-           }catch(Exception e) {
-                   String errMsg = "!!! Binary has been changed, please 
restart Nimbus !!! ";
-                   LOG.error(errMsg, e);
-               throw new TException(errMsg, e);
-           }
+        return Utils.getVersion();
     }
 
     @Override
-    public SupervisorWorkers getSupervisorWorkers(String host)
-            throws NotAliveException, TException {
+    public SupervisorWorkers getSupervisorWorkers(String host) throws 
NotAliveException, TException {
         long start = System.nanoTime();
         try {
 
@@ -770,15 +674,12 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
             String hostName = NetWorkUtils.ip2Host(host);
 
             // all supervisors
-            Map<String, SupervisorInfo> supervisorInfos =
-                    Cluster.get_all_SupervisorInfo(stormClusterState, null);
+            Map<String, SupervisorInfo> supervisorInfos = 
Cluster.get_all_SupervisorInfo(stormClusterState, null);
 
-            for (Entry<String, SupervisorInfo> entry : supervisorInfos
-                    .entrySet()) {
+            for (Entry<String, SupervisorInfo> entry : 
supervisorInfos.entrySet()) {
 
                 SupervisorInfo info = entry.getValue();
-                if (info.getHostName().equals(hostName)
-                        || info.getHostName().equals(ip)) {
+                if (info.getHostName().equals(hostName) || 
info.getHostName().equals(ip)) {
                     supervisorId = entry.getKey();
                     supervisorInfo = info;
                     break;
@@ -789,29 +690,20 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
                 throw new TException("No supervisor of " + host);
             }
 
-            Map<String, Assignment> assignments =
-                    Cluster.get_all_assignment(stormClusterState, null);
-
-            Map<Integer, WorkerSummary> portWorkerSummarys =
-                    new TreeMap<Integer, WorkerSummary>();
+            Map<String, Assignment> assignments = 
Cluster.get_all_assignment(stormClusterState, null);
 
-            Map<String, MetricInfo> metricInfoMap =
-                    new HashMap<String, MetricInfo>();
+            Map<Integer, WorkerSummary> portWorkerSummarys = new 
TreeMap<Integer, WorkerSummary>();
 
             int usedSlotNumber = 0;
 
-            Map<String, Map<Integer, String>> topologyTaskToComponent =
-                    new HashMap<String, Map<Integer, String>>();
+            Map<String, Map<Integer, String>> topologyTaskToComponent = new 
HashMap<String, Map<Integer, String>>();
 
+            Map<String, MetricInfo> metricInfoMap = new HashMap<String, 
MetricInfo>();
             for (Entry<String, Assignment> entry : assignments.entrySet()) {
                 String topologyId = entry.getKey();
                 Assignment assignment = entry.getValue();
 
                 Set<ResourceWorkerSlot> workers = assignment.getWorkers();
-                
-                TopologyMetric topologyMetric = 
data.getMetricRunnable().getTopologyMetric(topologyId);
-                
-
                 for (ResourceWorkerSlot worker : workers) {
                     if (supervisorId.equals(worker.getNodeId()) == false) {
                         continue;
@@ -829,67 +721,52 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
                         portWorkerSummarys.put(port, workerSummary);
                     }
 
-                    Map<Integer, String> taskToComponent =
-                            topologyTaskToComponent.get(topologyId);
+                    Map<Integer, String> taskToComponent = 
topologyTaskToComponent.get(topologyId);
                     if (taskToComponent == null) {
                         taskToComponent = 
Cluster.get_all_task_component(stormClusterState, topologyId, null);
-                        topologyTaskToComponent
-                                .put(topologyId, taskToComponent);
+                        topologyTaskToComponent.put(topologyId, 
taskToComponent);
                     }
 
                     int earliest = TimeUtils.current_time_secs();
                     for (Integer taskId : worker.getTasks()) {
-
                         TaskComponent taskComponent = new TaskComponent();
-                        taskComponent
-                                .set_component(taskToComponent.get(taskId));
+                        
taskComponent.set_component(taskToComponent.get(taskId));
                         taskComponent.set_taskId(taskId);
-                        Integer startTime =
-                                assignment.getTaskStartTimeSecs().get(taskId);
+                        Integer startTime = 
assignment.getTaskStartTimeSecs().get(taskId);
                         if (startTime != null && startTime < earliest) {
                             earliest = startTime;
                         }
 
                         workerSummary.add_to_tasks(taskComponent);
-
                     }
 
                     workerSummary.set_uptime(TimeUtils.time_delta(earliest));
 
-                    if (topologyMetric == null) {
-                        LOG.warn("Failed to get topologyMetric of " + 
topologyId);
-                        continue;
-                    }
-
-                    String workerSlotName =
-                            TopologyMetricsRunnable.getWorkerSlotName(
-                                    supervisorInfo.getHostName(), port);
-                    if (topologyMetric.get_workerMetric() != null) {
-                        MetricInfo workerMetricInfo =
-                                topologyMetric.get_workerMetric().get(
-                                        workerSlotName);
-
-                        if (workerMetricInfo != null) {
-                            metricInfoMap.put(workerSlotName, 
workerMetricInfo);
+                    String workerSlotName = 
getWorkerSlotName(supervisorInfo.getHostName(), port);
+                    List<MetricInfo> workerMetricInfoList = 
this.data.getMetricCache().getMetricData(topologyId, MetaType.WORKER);
+                    if (workerMetricInfoList.size() > 0) {
+                        MetricInfo workerMetricInfo = 
workerMetricInfoList.get(0);
+                        // remove metrics that don't belong to current worker
+                        for (Iterator<String> itr = 
workerMetricInfo.get_metrics().keySet().iterator();
+                             itr.hasNext(); ) {
+                            String metricName = itr.next();
+                            if (!metricName.contains(host)) {
+                                itr.remove();
+                            }
                         }
+                        metricInfoMap.put(workerSlotName, workerMetricInfo);
                     }
                 }
             }
 
-            List<WorkerSummary> wokersList = new ArrayList<WorkerSummary>();
-            wokersList.addAll(portWorkerSummarys.values());
+            List<WorkerSummary> workerList = new ArrayList<WorkerSummary>();
+            workerList.addAll(portWorkerSummarys.values());
 
-            Map<String, Integer> supervisorToUsedSlotNum =
-                    new HashMap<String, Integer>();
+            Map<String, Integer> supervisorToUsedSlotNum = new HashMap<String, 
Integer>();
             supervisorToUsedSlotNum.put(supervisorId, usedSlotNumber);
-            SupervisorSummary supervisorSummary =
-                    NimbusUtils.mkSupervisorSummary(supervisorInfo,
-                            supervisorId, supervisorToUsedSlotNum);
+            SupervisorSummary supervisorSummary = 
NimbusUtils.mkSupervisorSummary(supervisorInfo, supervisorId, 
supervisorToUsedSlotNum);
 
-            SupervisorWorkers ret =
-                    new SupervisorWorkers(supervisorSummary, wokersList,
-                            metricInfoMap);
-            return ret;
+            return new SupervisorWorkers(supervisorSummary, workerList, 
metricInfoMap);
 
         } catch (TException e) {
             LOG.info("Failed to get ClusterSummary ", e);
@@ -897,21 +774,19 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
         } catch (Exception e) {
             LOG.info("Failed to get ClusterSummary ", e);
             throw new TException(e);
-        }finally {
-            double spend = (System.nanoTime() - start)/1000000.0d;
-            SimpleJStormMetric.updateHistorgram("getSupervisorWorkers", spend);
-            LOG.info("getSupervisorWorkers, {} spend {} ms", host, spend);
+        } finally {
+            long end = System.nanoTime();
+            SimpleJStormMetric.updateNimbusHistogram("getSupervisorWorkers", 
(end - start) / TimeUtils.NS_PER_US);
         }
     }
 
     /**
      * Get TopologyInfo, it contain all data of the topology running status
-     * 
+     *
      * @return TopologyInfo
      */
     @Override
-    public TopologyInfo getTopologyInfo(String topologyId)
-            throws NotAliveException, TException {
+    public TopologyInfo getTopologyInfo(String topologyId) throws 
NotAliveException, TException {
         long start = System.nanoTime();
         StormClusterState stormClusterState = data.getStormClusterState();
 
@@ -923,49 +798,40 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
                 throw new NotAliveException("No topology of " + topologyId);
             }
 
-            Assignment assignment =
-                    stormClusterState.assignment_info(topologyId, null);
+            Assignment assignment = 
stormClusterState.assignment_info(topologyId, null);
             if (assignment == null) {
                 throw new NotAliveException("No topology of " + topologyId);
             }
-            
 
-            Map<String, TaskHeartbeat> taskHBMap =
-                    Cluster.get_all_task_heartbeat(stormClusterState,
-                            topologyId);
+            TopologyTaskHbInfo topologyTaskHbInfo = 
data.getTasksHeartbeat().get(topologyId);
+            Map<Integer, TaskHeartbeat> taskHbMap = null;
+            if (topologyTaskHbInfo != null)
+                taskHbMap = topologyTaskHbInfo.get_taskHbs();
 
             Map<Integer, TaskInfo> taskInfoMap = 
Cluster.get_all_taskInfo(stormClusterState, topologyId);
             Map<Integer, String> taskToComponent = 
Cluster.get_all_task_component(stormClusterState, topologyId, taskInfoMap);
             Map<Integer, String> taskToType = 
Cluster.get_all_task_type(stormClusterState, topologyId, taskInfoMap);
-            
 
-            String errorString = null;
+
+            String errorString;
             if (Cluster.is_topology_exist_error(stormClusterState, 
topologyId)) {
                 errorString = "Y";
             } else {
                 errorString = "";
             }
-            
+
             TopologySummary topologySummary = new TopologySummary();
             topologySummary.set_id(topologyId);
             topologySummary.set_name(base.getStormName());
-            topologySummary.set_uptime_secs(TimeUtils.time_delta(base
-                    .getLanchTimeSecs()));
-            ;
+            
topologySummary.set_uptimeSecs(TimeUtils.time_delta(base.getLanchTimeSecs()));
             topologySummary.set_status(base.getStatusString());
-            topologySummary.set_num_tasks(NimbusUtils
-                    .getTopologyTaskNum(assignment));
-            topologySummary.set_num_workers(assignment.getWorkers().size());
-            topologySummary.set_error_info(errorString);
-
-            Map<String, ComponentSummary> componentSummaryMap =
-                    new HashMap<String, ComponentSummary>();
+            
topologySummary.set_numTasks(NimbusUtils.getTopologyTaskNum(assignment));
+            topologySummary.set_numWorkers(assignment.getWorkers().size());
+            topologySummary.set_errorInfo(errorString);
 
-            HashMap<String, List<Integer>> componentToTasks =
-                    JStormUtils.reverse_map(taskToComponent);
-
-            for (Entry<String, List<Integer>> entry : componentToTasks
-                    .entrySet()) {
+            Map<String, ComponentSummary> componentSummaryMap = new 
HashMap<String, ComponentSummary>();
+            HashMap<String, List<Integer>> componentToTasks = 
JStormUtils.reverse_map(taskToComponent);
+            for (Entry<String, List<Integer>> entry : 
componentToTasks.entrySet()) {
                 String name = entry.getKey();
                 List<Integer> taskIds = entry.getValue();
                 if (taskIds == null || taskIds.size() == 0) {
@@ -979,43 +845,46 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
                 componentSummary.set_name(name);
                 componentSummary.set_type(taskToType.get(taskIds.get(0)));
                 componentSummary.set_parallel(taskIds.size());
-                componentSummary.set_task_ids(taskIds);
+                componentSummary.set_taskIds(taskIds);
             }
 
-            Map<Integer, TaskSummary> taskSummaryMap =
-                    new TreeMap<Integer, TaskSummary>();
-            Map<Integer, List<TaskError> > taskErrors = 
Cluster.get_all_task_errors(
-                       stormClusterState, topologyId);
+            Map<Integer, TaskSummary> taskSummaryMap = new TreeMap<Integer, 
TaskSummary>();
+            Map<Integer, List<TaskError>> taskErrors = 
Cluster.get_all_task_errors(stormClusterState, topologyId);
 
             for (Integer taskId : taskInfoMap.keySet()) {
                 TaskSummary taskSummary = new TaskSummary();
                 taskSummaryMap.put(taskId, taskSummary);
 
-                taskSummary.set_task_id(taskId);
-                TaskHeartbeat hb = taskHBMap.get(String.valueOf(taskId));
-                if (hb == null) {
+                taskSummary.set_taskId(taskId);
+                if (taskHbMap == null) {
                     taskSummary.set_status("Starting");
                     taskSummary.set_uptime(0);
                 } else {
-                    taskSummary.set_status("ACTIVE");
-                    taskSummary.set_uptime(hb.getUptimeSecs());
+                    TaskHeartbeat hb = taskHbMap.get(taskId);
+                    if (hb == null) {
+                        taskSummary.set_status("Starting");
+                        taskSummary.set_uptime(0);
+                    } else {
+                        boolean isInactive = NimbusUtils.isTaskDead(data, 
topologyId, taskId);
+                        if (isInactive)
+                            taskSummary.set_status("INACTIVE");
+                        else
+                            taskSummary.set_status("ACTIVE");
+                        taskSummary.set_uptime(hb.get_uptime());
+                    }
                 }
 
                 if (StringUtils.isBlank(errorString)) {
-                       continue;
+                    continue;
                 }
+
                 List<TaskError> taskErrorList = taskErrors.get(taskId);
                 if (taskErrorList != null && taskErrorList.size() != 0) {
                     for (TaskError taskError : taskErrorList) {
-                        ErrorInfo errorInfo =
-                                new ErrorInfo(taskError.getError(),
-                                        taskError.getTimSecs());
-
+                        ErrorInfo errorInfo = new 
ErrorInfo(taskError.getError(), taskError.getTimSecs());
                         taskSummary.add_to_errors(errorInfo);
-
                         String component = taskToComponent.get(taskId);
-                        componentSummaryMap.get(component).add_to_errors(
-                                errorInfo);
+                        
componentSummaryMap.get(component).add_to_errors(errorInfo);
                     }
                 }
             }
@@ -1033,14 +902,38 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
             TopologyInfo topologyInfo = new TopologyInfo();
             topologyInfo.set_topology(topologySummary);
-            topologyInfo.set_components(JStormUtils.mk_list(componentSummaryMap
-                    .values()));
-            topologyInfo
-                    .set_tasks(JStormUtils.mk_list(taskSummaryMap.values()));
-            topologyInfo.set_metrics(data.getMetricRunnable()
-                    .getTopologyMetric(topologyId));
-
-            
+            
topologyInfo.set_components(JStormUtils.mk_list(componentSummaryMap.values()));
+            
topologyInfo.set_tasks(JStormUtils.mk_list(taskSummaryMap.values()));
+
+            // return topology metric & component metric only
+            List<MetricInfo> tpMetricList = 
data.getMetricCache().getMetricData(topologyId, MetaType.TOPOLOGY);
+            List<MetricInfo> compMetricList = 
data.getMetricCache().getMetricData(topologyId, MetaType.COMPONENT);
+            List<MetricInfo> workerMetricList = 
data.getMetricCache().getMetricData(topologyId, MetaType.WORKER);
+            MetricInfo taskMetric = MetricUtils.mkMetricInfo();
+            MetricInfo streamMetric = MetricUtils.mkMetricInfo();
+            MetricInfo nettyMetric = MetricUtils.mkMetricInfo();
+            MetricInfo tpMetric, compMetric, workerMetric;
+
+            if (tpMetricList == null || tpMetricList.size() == 0) {
+                tpMetric = MetricUtils.mkMetricInfo();
+            } else {
+                // get the last min topology metric
+                tpMetric = tpMetricList.get(tpMetricList.size() - 1);
+            }
+            if (compMetricList == null || compMetricList.size() == 0) {
+                compMetric = MetricUtils.mkMetricInfo();
+            } else {
+                compMetric = compMetricList.get(0);
+            }
+            if (workerMetricList == null || workerMetricList.size() == 0) {
+                workerMetric = MetricUtils.mkMetricInfo();
+            } else {
+                workerMetric = workerMetricList.get(0);
+            }
+            TopologyMetric topologyMetrics = new TopologyMetric(tpMetric, 
compMetric, workerMetric,
+                    taskMetric, streamMetric, nettyMetric);
+            topologyInfo.set_metrics(topologyMetrics);
+
             return topologyInfo;
         } catch (TException e) {
             LOG.info("Failed to get topologyInfo " + topologyId, e);
@@ -1048,19 +941,15 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
         } catch (Exception e) {
             LOG.info("Failed to get topologyInfo " + topologyId, e);
             throw new TException("Failed to get topologyInfo" + topologyId);
-        }finally {
+        } finally {
             long end = System.nanoTime();
-            double spend = (end - start)/1000000.0d;
-            SimpleJStormMetric.updateHistorgram("getTopologyInfo", spend);
-            LOG.info("Finish getTopologyInfo {}, spend {} ms", topologyId, 
spend);
+            SimpleJStormMetric.updateNimbusHistogram("getTopologyInfo", (end - 
start) / TimeUtils.NS_PER_US);
         }
 
     }
 
     @Override
-    public TopologyInfo getTopologyInfoByName(String topologyName)
-            throws NotAliveException, TException {
-
+    public TopologyInfo getTopologyInfoByName(String topologyName) throws 
NotAliveException, TException {
         String topologyId = getTopologyId(topologyName);
         return getTopologyInfo(topologyId);
 
@@ -1069,8 +958,7 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
     @Override
     public String getNimbusConf() throws TException {
         try {
-            String ret = JStormUtils.to_json(data.getConf());
-            return ret;
+            return JStormUtils.to_json(data.getConf());
         } catch (Exception e) {
             String err = "Failed to generate Nimbus configuration";
             LOG.error(err, e);
@@ -1080,20 +968,17 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
     /**
      * get topology configuration
-     * 
+     *
      * @param id String: topology id
      * @return String
      */
     @Override
-    public String getTopologyConf(String id) throws NotAliveException,
-            TException {
+    public String getTopologyConf(String id) throws NotAliveException, 
TException {
         String rtn;
         try {
-            Map<Object, Object> topologyConf =
-                    StormConfig.read_nimbus_topology_conf(conf, id);
+            Map<Object, Object> topologyConf = 
StormConfig.read_nimbus_topology_conf(conf, id);
             rtn = JStormUtils.to_json(topologyConf);
         } catch (IOException e) {
-            // TODO Auto-generated catch block
             LOG.info("Failed to get configuration of " + id, e);
             throw new TException(e);
         }
@@ -1101,15 +986,12 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
     }
 
     @Override
-    public String getTopologyId(String topologyName) throws NotAliveException,
-            TException {
-        // TODO Auto-generated method stub
+    public String getTopologyId(String topologyName) throws NotAliveException, 
TException {
         StormClusterState stormClusterState = data.getStormClusterState();
 
         try {
             // get all active topology's StormBase
-            String topologyId =
-                    Cluster.get_topology_id(stormClusterState, topologyName);
+            String topologyId = Cluster.get_topology_id(stormClusterState, 
topologyName);
             if (topologyId != null) {
                 return topologyId;
             }
@@ -1125,24 +1007,20 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
     /**
      * get StormTopology throw deserialize local files
-     * 
+     *
      * @param id String: topology id
      * @return StormTopology
      */
     @Override
-    public StormTopology getTopology(String id) throws NotAliveException,
-            TException {
-        StormTopology topology = null;
+    public StormTopology getTopology(String id) throws NotAliveException, 
TException {
+        StormTopology topology;
         try {
-            StormTopology stormtopology =
-                    StormConfig.read_nimbus_topology_code(conf, id);
+            StormTopology stormtopology = 
StormConfig.read_nimbus_topology_code(conf, id);
             if (stormtopology == null) {
                 throw new NotAliveException("No topology of " + id);
             }
 
-            Map<Object, Object> topologyConf =
-                    (Map<Object, Object>) StormConfig
-                            .read_nimbus_topology_conf(conf, id);
+            Map<Object, Object> topologyConf = (Map<Object, Object>) 
StormConfig.read_nimbus_topology_conf(conf, id);
 
             topology = Common.system_topology(topologyConf, stormtopology);
         } catch (Exception e) {
@@ -1153,12 +1031,10 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
     }
 
     @Override
-    public StormTopology getUserTopology(String id) throws NotAliveException,
-            TException {
+    public StormTopology getUserTopology(String id) throws NotAliveException, 
TException {
         StormTopology topology = null;
         try {
-            StormTopology stormtopology =
-                    StormConfig.read_nimbus_topology_code(conf, id);
+            StormTopology stormtopology = 
StormConfig.read_nimbus_topology_code(conf, id);
             if (stormtopology == null) {
                 throw new NotAliveException("No topology of " + id);
             }
@@ -1173,35 +1049,32 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
     /**
      * check whether the topology is bActive?
-     * 
+     *
      * @param nimbus
      * @param topologyName
      * @param bActive
      * @throws Exception
      */
-    public void checkTopologyActive(NimbusData nimbus, String topologyName,
-            boolean bActive) throws Exception {
+    public void checkTopologyActive(NimbusData nimbus, String topologyName, 
boolean bActive) throws Exception {
         if (isTopologyActive(nimbus.getStormClusterState(), topologyName) != 
bActive) {
             if (bActive) {
                 throw new NotAliveException(topologyName + " is not alive");
             } else {
-                throw new AlreadyAliveException(topologyName
-                        + " is already active");
+                throw new AlreadyAliveException(topologyName + " is already 
active");
             }
         }
     }
 
     /**
      * whether the topology is active by topology name
-     * 
+     *
      * @param stormClusterState see Cluster_clj
      * @param topologyName
      * @return boolean if the storm is active, return true, otherwise return
-     *         false;
+     * false;
      * @throws Exception
      */
-    public boolean isTopologyActive(StormClusterState stormClusterState,
-            String topologyName) throws Exception {
+    public boolean isTopologyActive(StormClusterState stormClusterState, 
String topologyName) throws Exception {
         boolean rtn = false;
         if (Cluster.get_topology_id(stormClusterState, topologyName) != null) {
             rtn = true;
@@ -1213,7 +1086,7 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
      * create local topology files /local-dir/nimbus/topologyId/stormjar.jar
      * /local-dir/nimbus/topologyId/stormcode.ser
      * /local-dir/nimbus/topologyId/stormconf.ser
-     * 
+     *
      * @param conf
      * @param topologyId
      * @param tmpJarLocation
@@ -1221,9 +1094,8 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
      * @param topology
      * @throws IOException
      */
-    private void setupStormCode(Map<Object, Object> conf, String topologyId,
-            String tmpJarLocation, Map<Object, Object> stormConf,
-            StormTopology topology) throws IOException {
+    private void setupStormCode(Map<Object, Object> conf, String topologyId, 
String tmpJarLocation, Map<Object, Object> stormConf, StormTopology topology)
+            throws IOException {
         // local-dir/nimbus/stormdist/topologyId
         String stormroot = StormConfig.masterStormdistRoot(conf, topologyId);
 
@@ -1234,18 +1106,16 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
         setupJar(conf, tmpJarLocation, stormroot);
 
         // serialize to file /local-dir/nimbus/topologyId/stormcode.ser
-        FileUtils.writeByteArrayToFile(
-                new File(StormConfig.stormcode_path(stormroot)),
-                Utils.serialize(topology));
+        FileUtils.writeByteArrayToFile(new 
File(StormConfig.stormcode_path(stormroot)), Utils.serialize(topology));
 
         // serialize to file /local-dir/nimbus/topologyId/stormconf.ser
-        FileUtils.writeByteArrayToFile(
-                new File(StormConfig.stormconf_path(stormroot)),
-                Utils.serialize(stormConf));
+        FileUtils.writeByteArrayToFile(new 
File(StormConfig.stormconf_path(stormroot)), Utils.serialize(stormConf));
+
+        // Update downloadCode timeStamp
+        StormConfig.write_nimbus_topology_timestamp(data.getConf(), 
topologyId, System.currentTimeMillis());
     }
 
-    private boolean copyLibJars(String tmpJarLocation, String stormroot)
-            throws IOException {
+    private boolean copyLibJars(String tmpJarLocation, String stormroot) 
throws IOException {
         String srcLibPath = StormConfig.stormlib_path(tmpJarLocation);
         String destLibPath = StormConfig.stormlib_path(stormroot);
         LOG.info("Begin to copy from " + srcLibPath + " to " + destLibPath);
@@ -1265,14 +1135,13 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
     /**
      * Copy jar to /local-dir/nimbus/topologyId/stormjar.jar
-     * 
+     *
      * @param conf
      * @param tmpJarLocation
      * @param stormroot
      * @throws IOException
      */
-    private void setupJar(Map<Object, Object> conf, String tmpJarLocation,
-            String stormroot) throws IOException {
+    private void setupJar(Map<Object, Object> conf, String tmpJarLocation, 
String stormroot) throws IOException {
         if (!StormConfig.local_mode(conf)) {
             boolean existLibs = copyLibJars(tmpJarLocation, stormroot);
 
@@ -1287,8 +1156,7 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
             if (jarPath == null) {
                 if (existLibs == false) {
-                    throw new IllegalArgumentException("No jar under "
-                            + tmpJarLocation);
+                    throw new IllegalArgumentException("No jar under " + 
tmpJarLocation);
                 } else {
                     LOG.info("No submit jar");
                     return;
@@ -1297,86 +1165,71 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
 
             File srcFile = new File(jarPath);
             if (!srcFile.exists()) {
-                throw new IllegalArgumentException(jarPath + " to copy to "
-                        + stormroot + " does not exist!");
+                throw new IllegalArgumentException(jarPath + " to copy to " + 
stormroot + " does not exist!");
             }
 
             String path = StormConfig.stormjar_path(stormroot);
             File destFile = new File(path);
             FileUtils.copyFile(srcFile, destFile);
             srcFile.delete();
-
-            return;
         }
     }
 
     /**
      * generate TaskInfo for every bolt or spout in ZK 
/ZK/tasks/topoologyId/xxx
-     * 
+     *
      * @param conf
      * @param topologyId
      * @param stormClusterState
      * @throws Exception
      */
-    public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId,
-            StormClusterState stormClusterState) throws Exception {
+    public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId, 
StormClusterState stormClusterState) throws Exception {
+        Map<Integer, TaskInfo> taskToTaskInfo = 
mkTaskComponentAssignments(conf, topologyId);
 
         // mkdir /ZK/taskbeats/topoologyId
-        stormClusterState.setup_heartbeats(topologyId);
+        int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo);
+        TopologyTaskHbInfo topoTaskHbinfo = new TopologyTaskHbInfo(topologyId, 
masterId);
+        data.getTasksHeartbeat().put(topologyId, topoTaskHbinfo);
+        stormClusterState.topology_heartbeat(topologyId, topoTaskHbinfo);
 
-        Map<Integer, TaskInfo> taskToTaskInfo =
-                mkTaskComponentAssignments(conf, topologyId);
         if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) {
             throw new InvalidTopologyException("Failed to generate TaskIDs 
map");
         }
-            // key is taskid, value is taskinfo
+        // key is taskid, value is taskinfo
         stormClusterState.set_task(topologyId, taskToTaskInfo);
     }
 
     /**
      * generate a taskid(Integer) for every task
-     * 
+     *
      * @param conf
      * @param topologyid
      * @return Map<Integer, String>: from taskid to componentid
      * @throws IOException
      * @throws InvalidTopologyException
      */
-    public Map<Integer, TaskInfo> mkTaskComponentAssignments(
-            Map<Object, Object> conf, String topologyid) throws IOException,
-            InvalidTopologyException {
+    public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, 
Object> conf, String topologyid) throws IOException, InvalidTopologyException {
 
         // @@@ here exist a little problem,
         // we can directly pass stormConf from Submit method
-        Map<Object, Object> stormConf =
-                StormConfig.read_nimbus_topology_conf(conf, topologyid);
-
-        StormTopology stopology =
-                StormConfig.read_nimbus_topology_code(conf, topologyid);
-
+        Map<Object, Object> stormConf = 
StormConfig.read_nimbus_topology_conf(conf, topologyid);
+        StormTopology stopology = StormConfig.read_nimbus_topology_code(conf, 
topologyid);
         StormTopology topology = Common.system_topology(stormConf, stopology);
 
-
         return Common.mkTaskInfo(stormConf, topology, topologyid);
     }
 
-    
-
     @Override
-    public void metricMonitor(String topologyName, MonitorOptions options)
-            throws NotAliveException, TException {
+    public void metricMonitor(String topologyName, MonitorOptions options) 
throws TException {
         boolean isEnable = options.is_isEnable();
         StormClusterState clusterState = data.getStormClusterState();
 
         try {
-            String topologyId =
-                    Cluster.get_topology_id(clusterState, topologyName);
+            String topologyId = Cluster.get_topology_id(clusterState, 
topologyName);
             if (null != topologyId) {
                 clusterState.set_storm_monitor(topologyId, isEnable);
             } else {
-                throw new NotAliveException(
-                        "Failed to update metricsMonitor status as "
-                                + topologyName + " is not alive");
+                throw new NotAliveException("Failed to update metricsMonitor 
status as " + topologyName + " is not alive");
             }
         } catch (Exception e) {
             String errMsg = "Failed to update metricsMonitor " + topologyName;
@@ -1387,137 +1240,263 @@ public class ServiceHandler implements Iface, 
Shutdownable, DaemonCommon {
     }
 
     @Override
-    public TopologyMetric getTopologyMetric(String topologyId)
-            throws NotAliveException, TException {
-        LOG.debug("Nimbus service handler, getTopologyMetric, topology ID: "
-                + topologyId);
+    public TopologyMetric getTopologyMetrics(String topologyId) throws 
TException {
+        LOG.debug("Nimbus service handler, getTopologyMetric, topology ID: " + 
topologyId);
         long start = System.nanoTime();
         try {
-            TopologyMetric metric =
-                    data.getMetricRunnable().getTopologyMetric(topologyId);
-    
-            return metric;
-        }finally {
-            double spend = ( System.nanoTime()- start)/1000000.0d;;
-            SimpleJStormMetric.updateHistorgram("getTopologyMetric", spend);
-            LOG.info("getTopologyMetric, {}:{}", topologyId, spend);
+            return data.getMetricRunnable().getTopologyMetric(topologyId);
+        } finally {
+            long end = System.nanoTime();
+            SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", (end 
- start) / TimeUtils.NS_PER_US);
         }
     }
 
     @Override
-    public void workerUploadMetric(WorkerUploadMetrics uploadMetrics)
-            throws TException {
-        // TODO Auto-generated method stub
-        LOG.debug("!!!!!!! workerUploadMetric:{}:{}:{} ", 
uploadMetrics.get_topology_id(),
-                uploadMetrics.get_supervisor_id(), uploadMetrics.get_port());
+    public void uploadTopologyMetrics(String topologyId, TopologyMetric 
uploadMetrics) throws TException {
+        LOG.info("Received topology metrics:{}", topologyId);
 
-        TopologyMetricsRunnable.Update event =
-                new TopologyMetricsRunnable.Update();
-        event.workerMetrics = uploadMetrics;
+        Update event = new Update();
+        event.timestamp = System.currentTimeMillis();
+        event.topologyMetrics = uploadMetrics;
+        event.topologyId = topologyId;
 
         data.getMetricRunnable().pushEvent(event);
     }
 
+    @Override
+    public Map<String, Long> registerMetrics(String topologyId, Set<String> 
metrics) throws TException {
+        try {
+            return data.getMetricRunnable().registerMetrics(topologyId, 
metrics);
+        } catch (Exception ex) {
+            return null;
+        }
+    }
+
     public void uploadNewCredentials(String topologyName, Credentials creds) {
-        // TODO Auto-generated method stub
-        
     }
 
     @Override
-    public NettyMetric getNettyMetric(String topologyName, int pos) throws 
TException {
-        // TODO Auto-generated method stub
-        long start = System.nanoTime();
-        try {
-            String topologyId = getTopologyId(topologyName);
-            
-            if (pos < 0) {
-                LOG.warn("Invalid pos {}, set it as 0", pos);
-                pos = 0;
-            }
-            SortedMap<String, MetricInfo> allConnections = 
data.getMetricRunnable().getNettyMetric(topologyId);
-            int mapSize = allConnections.size();
-            
-            NettyMetric ret = new NettyMetric();
-            
-            ret.set_connectionNum(mapSize);
-            
-            
-            Map<String, MetricInfo> selectConnections = new TreeMap<String, 
MetricInfo>();
-            ret.set_connections(selectConnections);
-            int i = 0;
-            int selectMapSize = 0;
-            for (Entry<String, MetricInfo> entry: allConnections.entrySet()) {
-                i++;
-                if (i <= pos) {
-                    continue;
+    public List<MetricInfo> getMetrics(String topologyId, int type) throws 
TException {
+        MetaType metaType = MetaType.parse(type);
+        return data.getMetricCache().getMetricData(topologyId, metaType);
+    }
+
+    @Override
+    public MetricInfo getNettyMetrics(String topologyId) throws TException {
+        List<MetricInfo> metricInfoList = 
data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
+        if (metricInfoList != null && metricInfoList.size() > 0) {
+            return metricInfoList.get(0);
+        }
+        return new MetricInfo();
+    }
+
+    @Override
+    public MetricInfo getNettyMetricsByHost(String topologyId, String host) 
throws TException {
+        MetricInfo ret = new MetricInfo();
+
+        List<MetricInfo> metricInfoList = 
data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
+        if (metricInfoList != null && metricInfoList.size() > 0) {
+            MetricInfo metricInfo = metricInfoList.get(0);
+            for (Entry<String, Map<Integer, MetricSnapshot>> metricEntry : 
metricInfo.get_metrics().entrySet()) {
+                String metricName = metricEntry.getKey();
+                Map<Integer, MetricSnapshot> data = metricEntry.getValue();
+                if (metricName.contains(host)) {
+                    ret.put_to_metrics(metricName, data);
                 }
-                
-                selectConnections.put(entry.getKey(), entry.getValue());
-                selectMapSize++;
-                if (selectMapSize >= MetricDef.NETTY_METRICS_PACKAGE_SIZE) {
-                    break;
+            }
+        }
+
+        LOG.info("getNettyMetricsByHost, total size:{}", 
ret.get_metrics_size());
+        return ret;
+    }
+
+    @Override
+    public int getNettyMetricSizeByHost(String topologyId, String host) throws 
TException {
+        return getNettyMetricsByHost(topologyId, host).get_metrics_size();
+    }
+
+    @Override
+    public MetricInfo getPagingNettyMetrics(String topologyId, String host, 
int page) throws TException {
+        MetricInfo ret = new MetricInfo();
+
+        int start = (page - 1) * MetricUtils.NETTY_METRIC_PAGE_SIZE;
+        int end = page * MetricUtils.NETTY_METRIC_PAGE_SIZE;
+        int cur = -1;
+        List<MetricInfo> metricInfoList = 
data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
+        if (metricInfoList != null && metricInfoList.size() > 0) {
+            MetricInfo metricInfo = metricInfoList.get(0);
+            for (Entry<String, Map<Integer, MetricSnapshot>> metricEntry : 
metricInfo.get_metrics().entrySet()) {
+                String metricName = metricEntry.getKey();
+                Map<Integer, MetricSnapshot> data = metricEntry.getValue();
+                if (metricName.contains(host)) {
+                    ++cur;
+                    if (cur >= start && cur < end) {
+                        ret.put_to_metrics(metricName, data);
+                    }
+                    if (cur >= end) {
+                        break;
+                    }
                 }
             }
-                       
-            return ret;
-        }finally {
-            double spend = (System.nanoTime() - start)/1000000.0d;
-            SimpleJStormMetric.updateHistorgram("getNettyMetric", spend );
-            LOG.info("getNettyMetric, {}:{} ms", topologyName, spend);
         }
+
+        LOG.info("getNettyMetricsByHost, total size:{}", 
ret.get_metrics_size());
+        return ret;
     }
 
+    @Override
+    public MetricInfo getTaskMetrics(String topologyId, String component) 
throws TException {
+        List<MetricInfo> taskMetricList = getMetrics(topologyId, 
MetaType.TASK.getT());
+        if (taskMetricList != null && taskMetricList.size() > 0) {
+            MetricInfo metricInfo = taskMetricList.get(0);
+            Map<String, Map<Integer, MetricSnapshot>> metrics = 
metricInfo.get_metrics();
+            for (Iterator<String> itr = metrics.keySet().iterator(); 
itr.hasNext(); ) {
+                String metricName = itr.next();
+                String[] parts = metricName.split(MetricUtils.DELIM);
+                if (parts.length < 7 || !parts[2].equals(component)) {
+                    itr.remove();
+                }
+            }
+            LOG.info("taskMetric, total size:{}", 
metricInfo.get_metrics_size());
+            return metricInfo;
+        }
+        return MetricUtils.mkMetricInfo();
+    }
 
     @Override
-    public NettyMetric getServerNettyMetric(String topologyName, String 
serverName) throws TException {
-        // TODO Auto-generated method stub
-        long start = System.nanoTime();
-        try {
-            String topologyId = getTopologyId(topologyName);
-            
-            SortedMap<String, MetricInfo> allConnections = 
data.getMetricRunnable().getNettyMetric(topologyId);
-            int mapSize = allConnections.size();
-            
-            NettyMetric ret = new NettyMetric();
-            
-            String serverIp = NetWorkUtils.host2Ip(serverName);    
-            Map<String, MetricInfo> selectConnections = new TreeMap<String, 
MetricInfo>();
-            for (Entry<String, MetricInfo> entry: allConnections.entrySet()) {
-                if (entry.getKey().contains(serverIp)) {
-                    selectConnections.put(entry.getKey(), entry.getValue());
+    public List<MetricInfo> getTaskAndStreamMetrics(String topologyId, int 
taskId) throws TException {
+        List<MetricInfo> taskMetricList = getMetrics(topologyId, 
MetaType.TASK.getT());
+        List<MetricInfo> streamMetricList = getMetrics(topologyId, 
MetaType.STREAM.getT());
+
+        String taskIdStr = taskId + "";
+        MetricInfo taskMetricInfo;
+        if (taskMetricList != null && taskMetricList.size() > 0) {
+            taskMetricInfo = taskMetricList.get(0);
+            Map<String, Map<Integer, MetricSnapshot>> metrics = 
taskMetricInfo.get_metrics();
+            for (Iterator<String> itr = metrics.keySet().iterator(); 
itr.hasNext(); ) {
+                String metricName = itr.next();
+                String[] parts = metricName.split(MetricUtils.DELIM);
+                if (parts.length < 7 || !parts[3].equals(taskIdStr)) {
+                    itr.remove();
                 }
-                
             }
-             ret.set_connectionNum(selectConnections.size());
-             ret.set_connections(selectConnections);
-            return ret;
-        }finally {
-            double spend = (System.nanoTime()- start)/1000000.0d;
-            SimpleJStormMetric.updateHistorgram("getNettyMetric", spend);
-            LOG.info("getServerNettyMetric, {} : {}ms", topologyName, spend);
+        } else {
+            taskMetricInfo = MetricUtils.mkMetricInfo();
         }
+
+        MetricInfo streamMetricInfo;
+        if (streamMetricList != null && streamMetricList.size() > 0) {
+            streamMetricInfo = streamMetricList.get(0);
+            Map<String, Map<Integer, MetricSnapshot>> metrics = 
streamMetricInfo.get_metrics();
+            for (Iterator<String> itr = metrics.keySet().iterator(); 
itr.hasNext(); ) {
+                String metricName = itr.next();
+                String[] parts = metricName.split(MetricUtils.DELIM);
+                if (parts.length < 7 || !parts[3].equals(taskIdStr)) {
+                    itr.remove();
+                }
+            }
+        } else {
+            streamMetricInfo = MetricUtils.mkMetricInfo();
+        }
+        return Lists.newArrayList(taskMetricInfo, streamMetricInfo);
     }
 
     @Override
-    public void updateConf(String name, String conf) throws NotAliveException,
-            InvalidTopologyException, TException {
+    public List<MetricInfo> getSummarizedTopologyMetrics(String topologyId) 
throws TException {
+        return data.getMetricCache().getMetricData(topologyId, 
MetaType.TOPOLOGY);
+    }
+
+    @Override
+    public void updateTopology(String name, String uploadedLocation,
+            String updateConf) throws NotAliveException,
+                    InvalidTopologyException, TException {
         try {
             checkTopologyActive(data, name, true);
 
+            String topologyId = null;
+            StormClusterState stormClusterState = data.getStormClusterState();
+            topologyId = Cluster.get_topology_id(stormClusterState, name);
+            if (topologyId == null) {
+                throw new NotAliveException(name);
+            }
+            if (uploadedLocation != null) {
+                String stormroot =
+                        StormConfig.masterStormdistRoot(conf, topologyId);
+
+                int lastIndexOf = uploadedLocation.lastIndexOf("/");
+                // /local-dir/nimbus/inbox/xxxx/
+                String tmpDir = uploadedLocation.substring(0, lastIndexOf);
+
+                // /local-dir/nimbus/inbox/xxxx/stormjar.jar
+                String stormJarPath = StormConfig.stormjar_path(tmpDir);
+
+                File file = new File(uploadedLocation);
+                if (file.exists()) {
+                    file.renameTo(new File(stormJarPath));
+                } else {
+                    throw new FileNotFoundException("Source \'"
+                            + uploadedLocation + "\' does not exist");
+                }
+                // move fileDir to /local-dir/nimbus/topologyid/
+                File srcDir = new File(tmpDir);
+                File destDir = new File(stormroot);
+                try {
+                    FileUtils.moveDirectory(srcDir, destDir);
+                } catch (FileExistsException e) {
+                    FileUtils.copyDirectory(srcDir, destDir);
+                    FileUtils.deleteQuietly(srcDir);
+                }
+                               // Update downloadCode timeStamp
+                               
StormConfig.write_nimbus_topology_timestamp(data.getConf(), topologyId, 
System.currentTimeMillis());
+                LOG.info("update jar of " + name + " successfully");
+            }
+
+            Map topoConf = 
StormConfig.read_nimbus_topology_conf(data.getConf(),
+                    topologyId);
             Map<Object, Object> config =
-                    (Map<Object, Object>) JStormUtils.from_json(conf);
+                    (Map<Object, Object>) JStormUtils.from_json(updateConf);
+            topoConf.putAll(config);
+            StormConfig.write_nimbus_topology_conf(data.getConf(), topologyId,
+                    topoConf);
 
+            LOG.info("update topology " + name + " successfully");
             NimbusUtils.transitionName(data, name, true,
-                    StatusType.update_conf, config);
+                    StatusType.update_topology, config);
+
         } catch (NotAliveException e) {
-            String errMsg = "Rebalance Error, no this topology " + name;
+            String errMsg = "Error, no this topology " + name;
             LOG.error(errMsg, e);
             throw new NotAliveException(errMsg);
         } catch (Exception e) {
-            String errMsg = "Failed to rebalance topology " + name;
+            String errMsg = "Failed to update topology " + name;
             LOG.error(errMsg, e);
             throw new TException(errMsg);
         }
+
+    }
+
+    @Override
+    public void updateTaskHeartbeat(TopologyTaskHbInfo taskHbs) throws 
TException {
+        String topologyId = taskHbs.get_topologyId();
+        Integer topologyMasterId = taskHbs.get_topologyMasterId();
+        TopologyTaskHbInfo nimbusTaskHbs = 
data.getTasksHeartbeat().get(topologyId);
+
+        if (nimbusTaskHbs == null) {
+            nimbusTaskHbs = new TopologyTaskHbInfo(topologyId, 
topologyMasterId);
+            data.getTasksHeartbeat().put(topologyId, nimbusTaskHbs);
+        }
+
+        Map<Integer, TaskHeartbeat> nimbusTaskHbMap = 
nimbusTaskHbs.get_taskHbs();
+        if (nimbusTaskHbMap == null) {
+            nimbusTaskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
+            nimbusTaskHbs.set_taskHbs(nimbusTaskHbMap);
+        }
         
+        Map<Integer, TaskHeartbeat> taskHbMap = taskHbs.get_taskHbs();
+        if (taskHbMap != null) {
+            for (Entry<Integer, TaskHeartbeat> entry : taskHbMap.entrySet()) {
+                nimbusTaskHbMap.put(entry.getKey(), entry.getValue());
+            }
+        }
     }
 }

Reply via email to