http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java
----------------------------------------------------------------------
diff --git
a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java
b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java
index 513e83f..b60864a 100644
---
a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java
+++
b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java
@@ -18,95 +18,54 @@
*/
package com.alibaba.jstorm.daemon.nimbus;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.daemon.Shutdownable;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.ComponentSummary;
-import backtype.storm.generated.Credentials;
-import backtype.storm.generated.ErrorInfo;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.MetricInfo;
-import backtype.storm.generated.MonitorOptions;
-import backtype.storm.generated.NettyMetric;
+import backtype.storm.generated.*;
import backtype.storm.generated.Nimbus.Iface;
-import backtype.storm.generated.NimbusSummary;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.SupervisorSummary;
-import backtype.storm.generated.SupervisorWorkers;
-import backtype.storm.generated.TaskComponent;
-import backtype.storm.generated.TaskSummary;
-import backtype.storm.generated.TopologyAssignException;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.generated.TopologyInitialStatus;
-import backtype.storm.generated.TopologyMetric;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.generated.WorkerSummary;
-import backtype.storm.generated.WorkerUploadMetrics;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.callback.impl.RemoveTransitionCallback;
-import com.alibaba.jstorm.cluster.Cluster;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.cluster.DaemonCommon;
-import com.alibaba.jstorm.cluster.StormBase;
-import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.cluster.*;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
-import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metric.SimpleJStormMetric;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.error.TaskError;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
-import com.alibaba.jstorm.utils.FailedAssignTopologyException;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.NetWorkUtils;
-import com.alibaba.jstorm.utils.PathUtils;
-import com.alibaba.jstorm.utils.Thrift;
-import com.alibaba.jstorm.utils.TimeUtils;
+import com.alibaba.jstorm.utils.*;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileExistsException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.security.InvalidParameterException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.*;
/**
* Thrift callback, all commands handling entrance
- *
+ *
* @author version 1: lixin, version 2:Longda
- *
*/
public class ServiceHandler implements Iface, Shutdownable, DaemonCommon {
- private final static Logger LOG = LoggerFactory
- .getLogger(ServiceHandler.class);
+ private final static Logger LOG =
LoggerFactory.getLogger(ServiceHandler.class);
public final static int THREAD_NUM = 64;
@@ -138,24 +97,18 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
}
@Override
- public void submitTopology(String name, String uploadedJarLocation,
- String jsonConf, StormTopology topology)
- throws AlreadyAliveException, InvalidTopologyException,
- TopologyAssignException, TException {
+ public void submitTopology(String name, String uploadedJarLocation, String
jsonConf, StormTopology topology) throws TException, AlreadyAliveException,
+ InvalidTopologyException, TopologyAssignException {
SubmitOptions options = new
SubmitOptions(TopologyInitialStatus.ACTIVE);
-
- submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology,
- options);
+ submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology,
options);
}
- private void makeAssignment(String topologyName, String topologyId,
- TopologyInitialStatus status) throws FailedAssignTopologyException
{
+ private void makeAssignment(String topologyName, String topologyId,
TopologyInitialStatus status) throws FailedAssignTopologyException {
TopologyAssignEvent assignEvent = new TopologyAssignEvent();
assignEvent.setTopologyId(topologyId);
assignEvent.setScratch(false);
assignEvent.setTopologyName(topologyName);
- assignEvent.setOldStatus(Thrift
- .topologyInitialStatusToStormStatus(status));
+
assignEvent.setOldStatus(Thrift.topologyInitialStatusToStormStatus(status));
TopologyAssign.push(assignEvent);
@@ -169,86 +122,100 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
/**
* Submit one Topology
- *
- * @param topologyname String: topology name
+ *
+ * @param topologyName String: topology name
* @param uploadedJarLocation String: already uploaded jar path
- * @param jsonConf String: jsonConf serialize all toplogy configuration to
- * Json
- * @param topology StormTopology: topology Object
+ * @param jsonConf String: jsonConf serialize all toplogy
configuration to
+ * Json
+ * @param topology StormTopology: topology Object
*/
@SuppressWarnings("unchecked")
@Override
- public void submitTopologyWithOpts(String topologyname,
- String uploadedJarLocation, String jsonConf,
- StormTopology topology, SubmitOptions options)
- throws AlreadyAliveException, InvalidTopologyException,
- TopologyAssignException, TException {
- LOG.info("Receive " + topologyname + ", uploadedJarLocation:"
- + uploadedJarLocation);
+ public void submitTopologyWithOpts(String topologyName, String
uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions
options)
+ throws AlreadyAliveException, InvalidTopologyException,
TopologyAssignException, TException {
+ LOG.info("Receive " + topologyName + ", uploadedJarLocation:" +
uploadedJarLocation);
long start = System.nanoTime();
+
+ //check topologyname is valid
+ if (!Common.charValidate(topologyName)) {
+ throw new InvalidTopologyException(topologyName + " is not a valid
topology name");
+ }
+
try {
- checkTopologyActive(data, topologyname, false);
+ checkTopologyActive(data, topologyName, false);
} catch (AlreadyAliveException e) {
- LOG.info(topologyname + " is already exist ");
+ LOG.info(topologyName + " already exists ");
throw e;
} catch (Throwable e) {
LOG.info("Failed to check whether topology is alive or not", e);
throw new TException(e);
}
- int counter = data.getSubmittedCount().incrementAndGet();
- String topologyId = Common.topologyNameToId(topologyname, counter);
- data.getPendingSubmitTopoloygs().put(topologyId, null);
-
+ String topologyId = null;
+ synchronized (data) {
+ // avoid to the same topologys wered submmitted at the same time
+ Set<String> pendingTopologys =
+ data.getPendingSubmitTopoloygs().keySet();
+ for (String cachTopologyId : pendingTopologys) {
+ if (cachTopologyId.contains(topologyName + "-"))
+ throw new AlreadyAliveException(
+ topologyName + " were submitted");
+ }
+ int counter = data.getSubmittedCount().incrementAndGet();
+ topologyId = Common.topologyNameToId(topologyName, counter);
+ data.getPendingSubmitTopoloygs().put(topologyId, null);
+ }
try {
- Map<Object, Object> serializedConf =
- (Map<Object, Object>) JStormUtils.from_json(jsonConf);
+ Map<Object, Object> serializedConf = (Map<Object, Object>)
JStormUtils.from_json(jsonConf);
if (serializedConf == null) {
LOG.warn("Failed to serialized Configuration");
- throw new InvalidTopologyException(
- "Failed to serilaze topology configuration");
+ throw new InvalidTopologyException("Failed to serialize
topology configuration");
}
serializedConf.put(Config.TOPOLOGY_ID, topologyId);
- serializedConf.put(Config.TOPOLOGY_NAME, topologyname);
+ serializedConf.put(Config.TOPOLOGY_NAME, topologyName);
Map<Object, Object> stormConf;
- stormConf =
- NimbusUtils.normalizeConf(conf, serializedConf, topology);
+ stormConf = NimbusUtils.normalizeConf(conf, serializedConf,
topology);
LOG.info("Normalized configuration:" + stormConf);
- data.getTopologyNettyMgr().setTopology(stormConf);
- Map<Object, Object> totalStormConf =
- new HashMap<Object, Object>(conf);
+ Map<Object, Object> totalStormConf = new HashMap<Object,
Object>(conf);
totalStormConf.putAll(stormConf);
- StormTopology normalizedTopology =
- NimbusUtils.normalizeTopology(stormConf, topology, true);
+ StormTopology normalizedTopology =
NimbusUtils.normalizeTopology(stormConf, topology, true);
// this validates the structure of the topology
- Common.validate_basic(normalizedTopology, totalStormConf,
- topologyId);
-
+ Common.validate_basic(normalizedTopology, totalStormConf,
topologyId);
// don't need generate real topology, so skip
Common.system_topology
// Common.system_topology(totalStormConf, topology);
StormClusterState stormClusterState = data.getStormClusterState();
+ double metricsSampleRate =
ConfigExtension.getMetricSampleRate(stormConf);
// create /local-dir/nimbus/topologyId/xxxx files
- setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
- normalizedTopology);
+ setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
normalizedTopology);
// generate TaskInfo for every bolt or spout in ZK
// /ZK/tasks/topoologyId/xxx
setupZkTaskInfo(conf, topologyId, stormClusterState);
// make assignments for a topology
- LOG.info("Submit for " + topologyname + " with conf "
- + serializedConf);
- makeAssignment(topologyname, topologyId,
- options.get_initial_status());
+ LOG.info("Submit for " + topologyName + " with conf " +
serializedConf);
+ makeAssignment(topologyName, topologyId,
options.get_initial_status());
+
+ // when make assignment for a topology,so remove the topologyid
form
+ // pendingSubmitTopologys
+ data.getPendingSubmitTopoloygs().remove(topologyId);
+
+ // push start event after startup
+ StartTopologyEvent startEvent = new StartTopologyEvent();
+ startEvent.clusterName = this.data.getClusterName();
+ startEvent.topologyId = topologyId;
+ startEvent.timestamp = System.currentTimeMillis();
+ startEvent.sampleRate = metricsSampleRate;
+ this.data.getMetricRunnable().pushEvent(startEvent);
} catch (FailedAssignTopologyException e) {
StringBuilder sb = new StringBuilder();
@@ -291,28 +258,27 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
LOG.error(sb.toString(), e);
data.getPendingSubmitTopoloygs().remove(topologyId);
throw new TopologyAssignException(sb.toString());
- }finally {
- double spend = (System.nanoTime() - start)/1000000.0d;
- SimpleJStormMetric.updateHistorgram("submitTopologyWithOpts",
spend);
- LOG.info("submitTopologyWithOpts {} spend {}ms", topologyname,
spend);
+ } finally {
+ double spend = (System.nanoTime() - start) / TimeUtils.NS_PER_US;
+ SimpleJStormMetric.updateNimbusHistogram("submitTopologyWithOpts",
spend);
+ LOG.info("submitTopologyWithOpts {} costs {}ms", topologyName,
spend);
}
}
/**
* kill topology
- *
- * @param topologyname String topology name
+ *
+ * @param topologyName String topology name
*/
@Override
- public void killTopology(String name) throws NotAliveException, TException
{
- killTopologyWithOpts(name, new KillOptions());
+ public void killTopology(String topologyName) throws TException,
NotAliveException {
+ killTopologyWithOpts(topologyName, new KillOptions());
}
@Override
- public void killTopologyWithOpts(String topologyName, KillOptions options)
- throws NotAliveException, TException {
+ public void killTopologyWithOpts(String topologyName, KillOptions options)
throws TException, NotAliveException {
try {
checkTopologyActive(data, topologyName, true);
@@ -322,17 +288,13 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
if (options.is_set_wait_secs()) {
wait_amt = options.get_wait_secs();
}
- NimbusUtils.transitionName(data, topologyName, true,
- StatusType.kill, wait_amt);
+ NimbusUtils.transitionName(data, topologyName, true,
StatusType.kill, wait_amt);
- TopologyMetricsRunnable.Remove event =
- new TopologyMetricsRunnable.Remove();
+ Remove event = new Remove();
event.topologyId = topologyId;
-
data.getMetricRunnable().pushEvent(event);
} catch (NotAliveException e) {
- String errMsg =
- "KillTopology Error, no this topology " + topologyName;
+ String errMsg = "KillTopology Error, no this topology " +
topologyName;
LOG.error(errMsg, e);
throw new NotAliveException(errMsg);
} catch (Exception e) {
@@ -345,16 +307,13 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
/**
* set topology status as active
- *
- * @param topologyname
- *
+ *
+ * @param topologyName
*/
@Override
- public void activate(String topologyName) throws NotAliveException,
- TException {
+ public void activate(String topologyName) throws TException,
NotAliveException {
try {
- NimbusUtils.transitionName(data, topologyName, true,
- StatusType.activate);
+ NimbusUtils.transitionName(data, topologyName, true,
StatusType.activate);
} catch (NotAliveException e) {
String errMsg = "Activate Error, no this topology " + topologyName;
LOG.error(errMsg, e);
@@ -369,20 +328,15 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
/**
* set topology stauts as deactive
- *
- * @param topologyname
- *
+ *
+ * @param topologyName
*/
@Override
- public void deactivate(String topologyName) throws NotAliveException,
- TException {
-
+ public void deactivate(String topologyName) throws TException,
NotAliveException {
try {
- NimbusUtils.transitionName(data, topologyName, true,
- StatusType.inactivate);
+ NimbusUtils.transitionName(data, topologyName, true,
StatusType.inactivate);
} catch (NotAliveException e) {
- String errMsg =
- "Deactivate Error, no this topology " + topologyName;
+ String errMsg = "Deactivate Error, no this topology " +
topologyName;
LOG.error(errMsg, e);
throw new NotAliveException(errMsg);
} catch (Exception e) {
@@ -395,20 +349,16 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
/**
* rebalance one topology
- *
+ *
+ * @param topologyName topology name
+ * @param options RebalanceOptions
* @@@ rebalance options hasn't implements
- *
- * It is used to let workers wait several seconds to finish jobs
- *
- * @param topologyname String
- * @param options RebalanceOptions
+ * <p/>
+ * It is used to let workers wait several seconds to finish jobs
*/
@Override
- public void rebalance(String topologyName, RebalanceOptions options)
- throws NotAliveException, TException, InvalidTopologyException {
-
+ public void rebalance(String topologyName, RebalanceOptions options)
throws TException, NotAliveException {
try {
-
checkTopologyActive(data, topologyName, true);
Integer wait_amt = null;
String jsonConf = null;
@@ -422,15 +372,11 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
jsonConf = options.get_conf();
}
- LOG.info("Begin to rebalance " + topologyName + "wait_time:"
- + wait_amt + ", reassign: " + reassign
- + ", new worker/bolt configuration:" + jsonConf);
+ LOG.info("Begin to rebalance " + topologyName + "wait_time:" +
wait_amt + ", reassign: " + reassign + ", new worker/bolt configuration:" +
jsonConf);
- Map<Object, Object> conf =
- (Map<Object, Object>) JStormUtils.from_json(jsonConf);
+ Map<Object, Object> conf = (Map<Object, Object>)
JStormUtils.from_json(jsonConf);
- NimbusUtils.transitionName(data, topologyName, true,
- StatusType.rebalance, wait_amt, reassign, conf);
+ NimbusUtils.transitionName(data, topologyName, true,
StatusType.rebalance, wait_amt, reassign, conf);
} catch (NotAliveException e) {
String errMsg = "Rebalance Error, no this topology " +
topologyName;
LOG.error(errMsg, e);
@@ -444,13 +390,12 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
}
@Override
- public void restart(String name, String jsonConf) throws NotAliveException,
- InvalidTopologyException, TopologyAssignException, TException {
+ public void restart(String name, String jsonConf) throws TException,
NotAliveException, InvalidTopologyException, TopologyAssignException {
LOG.info("Begin to restart " + name + ", new configuration:" +
jsonConf);
// 1. get topologyId
StormClusterState stormClusterState = data.getStormClusterState();
- String topologyId = null;
+ String topologyId;
try {
topologyId = Cluster.get_topology_id(stormClusterState, name);
} catch (Exception e2) {
@@ -468,30 +413,26 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
LOG.info("Deactivate " + name);
// 3. backup old jar/configuration/topology
- StormTopology topology = null;
-
- Map topologyConf = null;
+ StormTopology topology;
+ Map topologyConf;
String topologyCodeLocation = null;
try {
topology = StormConfig.read_nimbus_topology_code(conf, topologyId);
-
- topologyConf =
- StormConfig.read_nimbus_topology_conf(conf, topologyId);
+ topologyConf = StormConfig.read_nimbus_topology_conf(conf,
topologyId);
if (jsonConf != null) {
- Map<Object, Object> newConf =
- (Map<Object, Object>) JStormUtils.from_json(jsonConf);
+ Map<Object, Object> newConf = (Map<Object, Object>)
JStormUtils.from_json(jsonConf);
topologyConf.putAll(newConf);
}
// Copy storm files back to stormdist dir from the tmp dir
- String oldDistDir =
- StormConfig.masterStormdistRoot(conf, topologyId);
+ String oldDistDir = StormConfig.masterStormdistRoot(conf,
topologyId);
String parent = StormConfig.masterInbox(conf);
topologyCodeLocation = parent + PathUtils.SEPERATOR + topologyId;
FileUtils.forceMkdir(new File(topologyCodeLocation));
FileUtils.cleanDirectory(new File(topologyCodeLocation));
- FileUtils.copyDirectory(new File(oldDistDir), new File(
- topologyCodeLocation));
+ File stormDistDir = new File(oldDistDir);
+ stormDistDir.setLastModified(System.currentTimeMillis());
+ FileUtils.copyDirectory(stormDistDir, new
File(topologyCodeLocation));
LOG.info("Successfully read old jar/conf/topology " + name);
} catch (Exception e) {
@@ -499,8 +440,7 @@ public class ServiceHandler implements Iface, Shutdownable,
DaemonCommon {
if (topologyCodeLocation != null) {
try {
PathUtils.rmr(topologyCodeLocation);
- } catch (IOException e1) {
-
+ } catch (IOException ignored) {
}
}
throw new TException("Failed to read old jar/conf/topology ");
@@ -509,24 +449,31 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
// 4. Kill
// directly use remove command to kill, more stable than issue kill cmd
- RemoveTransitionCallback killCb =
- new RemoveTransitionCallback(data, topologyId);
+ RemoveTransitionCallback killCb = new RemoveTransitionCallback(data,
topologyId);
killCb.execute(new Object[0]);
LOG.info("Successfully kill the topology " + name);
+ // send metric events
+ TopologyMetricsRunnable.KillTopologyEvent killEvent = new
TopologyMetricsRunnable.KillTopologyEvent();
+ killEvent.clusterName = this.data.getClusterName();
+ killEvent.topologyId = topologyId;
+ killEvent.timestamp = System.currentTimeMillis();
+ this.data.getMetricRunnable().pushEvent(killEvent);
+
+ Remove removeEvent = new Remove();
+ removeEvent.topologyId = topologyId;
+ this.data.getMetricRunnable().pushEvent(removeEvent);
+
// 5. submit
try {
- submitTopology(name, topologyCodeLocation,
- JStormUtils.to_json(topologyConf), topology);
-
+ submitTopology(name, topologyCodeLocation,
JStormUtils.to_json(topologyConf), topology);
} catch (AlreadyAliveException e) {
LOG.info("Failed to kill the topology" + name);
throw new TException("Failed to kill the topology" + name);
} finally {
try {
PathUtils.rmr(topologyCodeLocation);
- } catch (IOException e1) {
-
+ } catch (IOException ignored) {
}
}
@@ -537,11 +484,9 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
try {
String parent = PathUtils.parent_path(libName);
PathUtils.local_mkdirs(parent);
- data.getUploaders().put(libName,
- Channels.newChannel(new FileOutputStream(libName)));
+ data.getUploaders().put(libName, Channels.newChannel(new
FileOutputStream(libName)));
LOG.info("Begin upload file from client to " + libName);
} catch (Exception e) {
- // TODO Auto-generated catch block
LOG.error("Fail to upload jar " + libName, e);
throw new TException(e);
}
@@ -549,23 +494,20 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
/**
* prepare to uploading topology jar, return the file location
- *
- * @throws
*/
@Override
public String beginFileUpload() throws TException {
String fileLoc = null;
try {
- String path = null;
+ String path;
String key = UUID.randomUUID().toString();
path = StormConfig.masterInbox(conf) + "/" + key;
FileUtils.forceMkdir(new File(path));
FileUtils.cleanDirectory(new File(path));
fileLoc = path + "/stormjar-" + key + ".jar";
- data.getUploaders().put(fileLoc,
- Channels.newChannel(new FileOutputStream(fileLoc)));
+ data.getUploaders().put(fileLoc, Channels.newChannel(new
FileOutputStream(fileLoc)));
LOG.info("Begin upload file from client to " + fileLoc);
return path;
} catch (FileNotFoundException e) {
@@ -581,14 +523,11 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
* uploading topology jar data
*/
@Override
- public void uploadChunk(String location, ByteBuffer chunk)
- throws TException {
+ public void uploadChunk(String location, ByteBuffer chunk) throws
TException {
TimeCacheMap<Object, Object> uploaders = data.getUploaders();
Object obj = uploaders.get(location);
if (obj == null) {
- throw new TException(
- "File for that location does not exist (or timed out) "
- + location);
+ throw new TException("File for that location does not exist (or
timed out) " + location);
}
try {
if (obj instanceof WritableByteChannel) {
@@ -596,13 +535,10 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
channel.write(chunk);
uploaders.put(location, channel);
} else {
- throw new TException("Object isn't WritableByteChannel for "
- + location);
+ throw new TException("Object isn't WritableByteChannel for " +
location);
}
} catch (IOException e) {
- String errMsg =
- " WritableByteChannel write filed when uploadChunk "
- + location;
+ String errMsg = " WritableByteChannel write filed when uploadChunk
" + location;
LOG.error(errMsg);
throw new TException(e);
}
@@ -611,12 +547,10 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
@Override
public void finishFileUpload(String location) throws TException {
-
TimeCacheMap<Object, Object> uploaders = data.getUploaders();
Object obj = uploaders.get(location);
if (obj == null) {
- throw new TException(
- "File for that location does not exist (or timed out)");
+ throw new TException("File for that location does not exist (or
timed out)");
}
try {
if (obj instanceof WritableByteChannel) {
@@ -625,25 +559,20 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
uploaders.remove(location);
LOG.info("Finished uploading file from client: " + location);
} else {
- throw new TException("Object isn't WritableByteChannel for "
- + location);
+ throw new TException("Object isn't WritableByteChannel for " +
location);
}
} catch (IOException e) {
- LOG.error(" WritableByteChannel close failed when finishFileUpload
"
- + location);
+ LOG.error(" WritableByteChannel close failed when finishFileUpload
" + location);
}
}
@Override
public String beginFileDownload(String file) throws TException {
- BufferFileInputStream is = null;
- String id = null;
+ BufferFileInputStream is;
+ String id;
try {
- int bufferSize =
- JStormUtils.parseInt(
- conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE),
- 1024 * 1024) / 2;
+ int bufferSize =
JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE), 1024 *
1024) / 2;
is = new BufferFileInputStream(file, bufferSize);
id = UUID.randomUUID().toString();
@@ -670,16 +599,14 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
BufferFileInputStream is = (BufferFileInputStream) obj;
byte[] ret = is.read();
if (ret != null) {
- downloaders.put(id, (BufferFileInputStream) is);
+ downloaders.put(id, is);
return ByteBuffer.wrap(ret);
}
} else {
- throw new TException("Object isn't BufferFileInputStream for "
- + id);
+ throw new TException("Object isn't BufferFileInputStream for "
+ id);
}
} catch (IOException e) {
- LOG.error("BufferFileInputStream read failed when downloadChunk ",
- e);
+ LOG.error("BufferFileInputStream read failed when downloadChunk ",
e);
throw new TException(e);
}
byte[] empty = {};
@@ -692,72 +619,49 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
}
/**
- * get cluster's summary, it will contain SupervisorSummary and
- * TopologySummary
- *
+ * get cluster's summary, it will contain SupervisorSummary and
TopologySummary
+ *
* @return ClusterSummary
*/
@Override
public ClusterSummary getClusterInfo() throws TException {
long start = System.nanoTime();
try {
-
StormClusterState stormClusterState = data.getStormClusterState();
- Map<String, Assignment> assignments =
- new HashMap<String, Assignment>();
+ Map<String, Assignment> assignments = new HashMap<String,
Assignment>();
// get TopologySummary
- List<TopologySummary> topologySummaries =
- NimbusUtils.getTopologySummary(stormClusterState,
- assignments);
+ List<TopologySummary> topologySummaries =
NimbusUtils.getTopologySummary(stormClusterState, assignments);
// all supervisors
- Map<String, SupervisorInfo> supervisorInfos =
- Cluster.get_all_SupervisorInfo(stormClusterState, null);
+ Map<String, SupervisorInfo> supervisorInfos =
Cluster.get_all_SupervisorInfo(stormClusterState, null);
// generate SupervisorSummaries
- List<SupervisorSummary> supervisorSummaries =
- NimbusUtils.mkSupervisorSummaries(supervisorInfos,
- assignments);
-
- NimbusSummary nimbusSummary =
- NimbusUtils.getNimbusSummary(stormClusterState,
- supervisorSummaries, data);
+ List<SupervisorSummary> supervisorSummaries =
NimbusUtils.mkSupervisorSummaries(supervisorInfos, assignments);
- ClusterSummary ret =
- new ClusterSummary(nimbusSummary, supervisorSummaries,
- topologySummaries);
-
- return ret;
+ NimbusSummary nimbusSummary =
NimbusUtils.getNimbusSummary(stormClusterState, supervisorSummaries, data);
+ return new ClusterSummary(nimbusSummary, supervisorSummaries,
topologySummaries);
} catch (TException e) {
LOG.info("Failed to get ClusterSummary ", e);
throw e;
} catch (Exception e) {
LOG.info("Failed to get ClusterSummary ", e);
throw new TException(e);
- }finally {
- double spend = (System.nanoTime() - start)/1000000.0d;
- SimpleJStormMetric.updateHistorgram("getClusterInfo", spend);
- LOG.info("getClusterInfo spend {}ms", spend);
+ } finally {
+ long end = System.nanoTime();
+ SimpleJStormMetric.updateNimbusHistogram("getClusterInfo", (end -
start) / TimeUtils.NS_PER_US);
}
}
@Override
public String getVersion() throws TException {
- try {
- return Utils.getVersion();
- }catch(Exception e) {
- String errMsg = "!!! Binary has been changed, please
restart Nimbus !!! ";
- LOG.error(errMsg, e);
- throw new TException(errMsg, e);
- }
+ return Utils.getVersion();
}
@Override
- public SupervisorWorkers getSupervisorWorkers(String host)
- throws NotAliveException, TException {
+ public SupervisorWorkers getSupervisorWorkers(String host) throws
NotAliveException, TException {
long start = System.nanoTime();
try {
@@ -770,15 +674,12 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
String hostName = NetWorkUtils.ip2Host(host);
// all supervisors
- Map<String, SupervisorInfo> supervisorInfos =
- Cluster.get_all_SupervisorInfo(stormClusterState, null);
+ Map<String, SupervisorInfo> supervisorInfos =
Cluster.get_all_SupervisorInfo(stormClusterState, null);
- for (Entry<String, SupervisorInfo> entry : supervisorInfos
- .entrySet()) {
+ for (Entry<String, SupervisorInfo> entry :
supervisorInfos.entrySet()) {
SupervisorInfo info = entry.getValue();
- if (info.getHostName().equals(hostName)
- || info.getHostName().equals(ip)) {
+ if (info.getHostName().equals(hostName) ||
info.getHostName().equals(ip)) {
supervisorId = entry.getKey();
supervisorInfo = info;
break;
@@ -789,29 +690,20 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
throw new TException("No supervisor of " + host);
}
- Map<String, Assignment> assignments =
- Cluster.get_all_assignment(stormClusterState, null);
-
- Map<Integer, WorkerSummary> portWorkerSummarys =
- new TreeMap<Integer, WorkerSummary>();
+ Map<String, Assignment> assignments =
Cluster.get_all_assignment(stormClusterState, null);
- Map<String, MetricInfo> metricInfoMap =
- new HashMap<String, MetricInfo>();
+ Map<Integer, WorkerSummary> portWorkerSummarys = new
TreeMap<Integer, WorkerSummary>();
int usedSlotNumber = 0;
- Map<String, Map<Integer, String>> topologyTaskToComponent =
- new HashMap<String, Map<Integer, String>>();
+ Map<String, Map<Integer, String>> topologyTaskToComponent = new
HashMap<String, Map<Integer, String>>();
+ Map<String, MetricInfo> metricInfoMap = new HashMap<String,
MetricInfo>();
for (Entry<String, Assignment> entry : assignments.entrySet()) {
String topologyId = entry.getKey();
Assignment assignment = entry.getValue();
Set<ResourceWorkerSlot> workers = assignment.getWorkers();
-
- TopologyMetric topologyMetric =
data.getMetricRunnable().getTopologyMetric(topologyId);
-
-
for (ResourceWorkerSlot worker : workers) {
if (supervisorId.equals(worker.getNodeId()) == false) {
continue;
@@ -829,67 +721,52 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
portWorkerSummarys.put(port, workerSummary);
}
- Map<Integer, String> taskToComponent =
- topologyTaskToComponent.get(topologyId);
+ Map<Integer, String> taskToComponent =
topologyTaskToComponent.get(topologyId);
if (taskToComponent == null) {
taskToComponent =
Cluster.get_all_task_component(stormClusterState, topologyId, null);
- topologyTaskToComponent
- .put(topologyId, taskToComponent);
+ topologyTaskToComponent.put(topologyId,
taskToComponent);
}
int earliest = TimeUtils.current_time_secs();
for (Integer taskId : worker.getTasks()) {
-
TaskComponent taskComponent = new TaskComponent();
- taskComponent
- .set_component(taskToComponent.get(taskId));
+
taskComponent.set_component(taskToComponent.get(taskId));
taskComponent.set_taskId(taskId);
- Integer startTime =
- assignment.getTaskStartTimeSecs().get(taskId);
+ Integer startTime =
assignment.getTaskStartTimeSecs().get(taskId);
if (startTime != null && startTime < earliest) {
earliest = startTime;
}
workerSummary.add_to_tasks(taskComponent);
-
}
workerSummary.set_uptime(TimeUtils.time_delta(earliest));
- if (topologyMetric == null) {
- LOG.warn("Failed to get topologyMetric of " +
topologyId);
- continue;
- }
-
- String workerSlotName =
- TopologyMetricsRunnable.getWorkerSlotName(
- supervisorInfo.getHostName(), port);
- if (topologyMetric.get_workerMetric() != null) {
- MetricInfo workerMetricInfo =
- topologyMetric.get_workerMetric().get(
- workerSlotName);
-
- if (workerMetricInfo != null) {
- metricInfoMap.put(workerSlotName,
workerMetricInfo);
+ String workerSlotName =
getWorkerSlotName(supervisorInfo.getHostName(), port);
+ List<MetricInfo> workerMetricInfoList =
this.data.getMetricCache().getMetricData(topologyId, MetaType.WORKER);
+ if (workerMetricInfoList.size() > 0) {
+ MetricInfo workerMetricInfo =
workerMetricInfoList.get(0);
+ // remove metrics that don't belong to current worker
+ for (Iterator<String> itr =
workerMetricInfo.get_metrics().keySet().iterator();
+ itr.hasNext(); ) {
+ String metricName = itr.next();
+ if (!metricName.contains(host)) {
+ itr.remove();
+ }
}
+ metricInfoMap.put(workerSlotName, workerMetricInfo);
}
}
}
- List<WorkerSummary> wokersList = new ArrayList<WorkerSummary>();
- wokersList.addAll(portWorkerSummarys.values());
+ List<WorkerSummary> workerList = new ArrayList<WorkerSummary>();
+ workerList.addAll(portWorkerSummarys.values());
- Map<String, Integer> supervisorToUsedSlotNum =
- new HashMap<String, Integer>();
+ Map<String, Integer> supervisorToUsedSlotNum = new HashMap<String,
Integer>();
supervisorToUsedSlotNum.put(supervisorId, usedSlotNumber);
- SupervisorSummary supervisorSummary =
- NimbusUtils.mkSupervisorSummary(supervisorInfo,
- supervisorId, supervisorToUsedSlotNum);
+ SupervisorSummary supervisorSummary =
NimbusUtils.mkSupervisorSummary(supervisorInfo, supervisorId,
supervisorToUsedSlotNum);
- SupervisorWorkers ret =
- new SupervisorWorkers(supervisorSummary, wokersList,
- metricInfoMap);
- return ret;
+ return new SupervisorWorkers(supervisorSummary, workerList,
metricInfoMap);
} catch (TException e) {
LOG.info("Failed to get ClusterSummary ", e);
@@ -897,21 +774,19 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
} catch (Exception e) {
LOG.info("Failed to get ClusterSummary ", e);
throw new TException(e);
- }finally {
- double spend = (System.nanoTime() - start)/1000000.0d;
- SimpleJStormMetric.updateHistorgram("getSupervisorWorkers", spend);
- LOG.info("getSupervisorWorkers, {} spend {} ms", host, spend);
+ } finally {
+ long end = System.nanoTime();
+ SimpleJStormMetric.updateNimbusHistogram("getSupervisorWorkers",
(end - start) / TimeUtils.NS_PER_US);
}
}
/**
* Get TopologyInfo, it contain all data of the topology running status
- *
+ *
* @return TopologyInfo
*/
@Override
- public TopologyInfo getTopologyInfo(String topologyId)
- throws NotAliveException, TException {
+ public TopologyInfo getTopologyInfo(String topologyId) throws
NotAliveException, TException {
long start = System.nanoTime();
StormClusterState stormClusterState = data.getStormClusterState();
@@ -923,49 +798,40 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
throw new NotAliveException("No topology of " + topologyId);
}
- Assignment assignment =
- stormClusterState.assignment_info(topologyId, null);
+ Assignment assignment =
stormClusterState.assignment_info(topologyId, null);
if (assignment == null) {
throw new NotAliveException("No topology of " + topologyId);
}
-
- Map<String, TaskHeartbeat> taskHBMap =
- Cluster.get_all_task_heartbeat(stormClusterState,
- topologyId);
+ TopologyTaskHbInfo topologyTaskHbInfo =
data.getTasksHeartbeat().get(topologyId);
+ Map<Integer, TaskHeartbeat> taskHbMap = null;
+ if (topologyTaskHbInfo != null)
+ taskHbMap = topologyTaskHbInfo.get_taskHbs();
Map<Integer, TaskInfo> taskInfoMap =
Cluster.get_all_taskInfo(stormClusterState, topologyId);
Map<Integer, String> taskToComponent =
Cluster.get_all_task_component(stormClusterState, topologyId, taskInfoMap);
Map<Integer, String> taskToType =
Cluster.get_all_task_type(stormClusterState, topologyId, taskInfoMap);
-
- String errorString = null;
+
+ String errorString;
if (Cluster.is_topology_exist_error(stormClusterState,
topologyId)) {
errorString = "Y";
} else {
errorString = "";
}
-
+
TopologySummary topologySummary = new TopologySummary();
topologySummary.set_id(topologyId);
topologySummary.set_name(base.getStormName());
- topologySummary.set_uptime_secs(TimeUtils.time_delta(base
- .getLanchTimeSecs()));
- ;
+
topologySummary.set_uptimeSecs(TimeUtils.time_delta(base.getLanchTimeSecs()));
topologySummary.set_status(base.getStatusString());
- topologySummary.set_num_tasks(NimbusUtils
- .getTopologyTaskNum(assignment));
- topologySummary.set_num_workers(assignment.getWorkers().size());
- topologySummary.set_error_info(errorString);
-
- Map<String, ComponentSummary> componentSummaryMap =
- new HashMap<String, ComponentSummary>();
+
topologySummary.set_numTasks(NimbusUtils.getTopologyTaskNum(assignment));
+ topologySummary.set_numWorkers(assignment.getWorkers().size());
+ topologySummary.set_errorInfo(errorString);
- HashMap<String, List<Integer>> componentToTasks =
- JStormUtils.reverse_map(taskToComponent);
-
- for (Entry<String, List<Integer>> entry : componentToTasks
- .entrySet()) {
+ Map<String, ComponentSummary> componentSummaryMap = new
HashMap<String, ComponentSummary>();
+ HashMap<String, List<Integer>> componentToTasks =
JStormUtils.reverse_map(taskToComponent);
+ for (Entry<String, List<Integer>> entry :
componentToTasks.entrySet()) {
String name = entry.getKey();
List<Integer> taskIds = entry.getValue();
if (taskIds == null || taskIds.size() == 0) {
@@ -979,43 +845,46 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
componentSummary.set_name(name);
componentSummary.set_type(taskToType.get(taskIds.get(0)));
componentSummary.set_parallel(taskIds.size());
- componentSummary.set_task_ids(taskIds);
+ componentSummary.set_taskIds(taskIds);
}
- Map<Integer, TaskSummary> taskSummaryMap =
- new TreeMap<Integer, TaskSummary>();
- Map<Integer, List<TaskError> > taskErrors =
Cluster.get_all_task_errors(
- stormClusterState, topologyId);
+ Map<Integer, TaskSummary> taskSummaryMap = new TreeMap<Integer,
TaskSummary>();
+ Map<Integer, List<TaskError>> taskErrors =
Cluster.get_all_task_errors(stormClusterState, topologyId);
for (Integer taskId : taskInfoMap.keySet()) {
TaskSummary taskSummary = new TaskSummary();
taskSummaryMap.put(taskId, taskSummary);
- taskSummary.set_task_id(taskId);
- TaskHeartbeat hb = taskHBMap.get(String.valueOf(taskId));
- if (hb == null) {
+ taskSummary.set_taskId(taskId);
+ if (taskHbMap == null) {
taskSummary.set_status("Starting");
taskSummary.set_uptime(0);
} else {
- taskSummary.set_status("ACTIVE");
- taskSummary.set_uptime(hb.getUptimeSecs());
+ TaskHeartbeat hb = taskHbMap.get(taskId);
+ if (hb == null) {
+ taskSummary.set_status("Starting");
+ taskSummary.set_uptime(0);
+ } else {
+ boolean isInactive = NimbusUtils.isTaskDead(data,
topologyId, taskId);
+ if (isInactive)
+ taskSummary.set_status("INACTIVE");
+ else
+ taskSummary.set_status("ACTIVE");
+ taskSummary.set_uptime(hb.get_uptime());
+ }
}
if (StringUtils.isBlank(errorString)) {
- continue;
+ continue;
}
+
List<TaskError> taskErrorList = taskErrors.get(taskId);
if (taskErrorList != null && taskErrorList.size() != 0) {
for (TaskError taskError : taskErrorList) {
- ErrorInfo errorInfo =
- new ErrorInfo(taskError.getError(),
- taskError.getTimSecs());
-
+ ErrorInfo errorInfo = new
ErrorInfo(taskError.getError(), taskError.getTimSecs());
taskSummary.add_to_errors(errorInfo);
-
String component = taskToComponent.get(taskId);
- componentSummaryMap.get(component).add_to_errors(
- errorInfo);
+
componentSummaryMap.get(component).add_to_errors(errorInfo);
}
}
}
@@ -1033,14 +902,38 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
TopologyInfo topologyInfo = new TopologyInfo();
topologyInfo.set_topology(topologySummary);
- topologyInfo.set_components(JStormUtils.mk_list(componentSummaryMap
- .values()));
- topologyInfo
- .set_tasks(JStormUtils.mk_list(taskSummaryMap.values()));
- topologyInfo.set_metrics(data.getMetricRunnable()
- .getTopologyMetric(topologyId));
-
-
+
topologyInfo.set_components(JStormUtils.mk_list(componentSummaryMap.values()));
+
topologyInfo.set_tasks(JStormUtils.mk_list(taskSummaryMap.values()));
+
+ // return topology metric & component metric only
+ List<MetricInfo> tpMetricList =
data.getMetricCache().getMetricData(topologyId, MetaType.TOPOLOGY);
+ List<MetricInfo> compMetricList =
data.getMetricCache().getMetricData(topologyId, MetaType.COMPONENT);
+ List<MetricInfo> workerMetricList =
data.getMetricCache().getMetricData(topologyId, MetaType.WORKER);
+ MetricInfo taskMetric = MetricUtils.mkMetricInfo();
+ MetricInfo streamMetric = MetricUtils.mkMetricInfo();
+ MetricInfo nettyMetric = MetricUtils.mkMetricInfo();
+ MetricInfo tpMetric, compMetric, workerMetric;
+
+ if (tpMetricList == null || tpMetricList.size() == 0) {
+ tpMetric = MetricUtils.mkMetricInfo();
+ } else {
+ // get the last min topology metric
+ tpMetric = tpMetricList.get(tpMetricList.size() - 1);
+ }
+ if (compMetricList == null || compMetricList.size() == 0) {
+ compMetric = MetricUtils.mkMetricInfo();
+ } else {
+ compMetric = compMetricList.get(0);
+ }
+ if (workerMetricList == null || workerMetricList.size() == 0) {
+ workerMetric = MetricUtils.mkMetricInfo();
+ } else {
+ workerMetric = workerMetricList.get(0);
+ }
+ TopologyMetric topologyMetrics = new TopologyMetric(tpMetric,
compMetric, workerMetric,
+ taskMetric, streamMetric, nettyMetric);
+ topologyInfo.set_metrics(topologyMetrics);
+
return topologyInfo;
} catch (TException e) {
LOG.info("Failed to get topologyInfo " + topologyId, e);
@@ -1048,19 +941,15 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
} catch (Exception e) {
LOG.info("Failed to get topologyInfo " + topologyId, e);
throw new TException("Failed to get topologyInfo" + topologyId);
- }finally {
+ } finally {
long end = System.nanoTime();
- double spend = (end - start)/1000000.0d;
- SimpleJStormMetric.updateHistorgram("getTopologyInfo", spend);
- LOG.info("Finish getTopologyInfo {}, spend {} ms", topologyId,
spend);
+ SimpleJStormMetric.updateNimbusHistogram("getTopologyInfo", (end -
start) / TimeUtils.NS_PER_US);
}
}
@Override
- public TopologyInfo getTopologyInfoByName(String topologyName)
- throws NotAliveException, TException {
-
+ public TopologyInfo getTopologyInfoByName(String topologyName) throws
NotAliveException, TException {
String topologyId = getTopologyId(topologyName);
return getTopologyInfo(topologyId);
@@ -1069,8 +958,7 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
@Override
public String getNimbusConf() throws TException {
try {
- String ret = JStormUtils.to_json(data.getConf());
- return ret;
+ return JStormUtils.to_json(data.getConf());
} catch (Exception e) {
String err = "Failed to generate Nimbus configuration";
LOG.error(err, e);
@@ -1080,20 +968,17 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
/**
* get topology configuration
- *
+ *
* @param id String: topology id
* @return String
*/
@Override
- public String getTopologyConf(String id) throws NotAliveException,
- TException {
+ public String getTopologyConf(String id) throws NotAliveException,
TException {
String rtn;
try {
- Map<Object, Object> topologyConf =
- StormConfig.read_nimbus_topology_conf(conf, id);
+ Map<Object, Object> topologyConf =
StormConfig.read_nimbus_topology_conf(conf, id);
rtn = JStormUtils.to_json(topologyConf);
} catch (IOException e) {
- // TODO Auto-generated catch block
LOG.info("Failed to get configuration of " + id, e);
throw new TException(e);
}
@@ -1101,15 +986,12 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
}
@Override
- public String getTopologyId(String topologyName) throws NotAliveException,
- TException {
- // TODO Auto-generated method stub
+ public String getTopologyId(String topologyName) throws NotAliveException,
TException {
StormClusterState stormClusterState = data.getStormClusterState();
try {
// get all active topology's StormBase
- String topologyId =
- Cluster.get_topology_id(stormClusterState, topologyName);
+ String topologyId = Cluster.get_topology_id(stormClusterState,
topologyName);
if (topologyId != null) {
return topologyId;
}
@@ -1125,24 +1007,20 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
/**
* get StormTopology throw deserialize local files
- *
+ *
* @param id String: topology id
* @return StormTopology
*/
@Override
- public StormTopology getTopology(String id) throws NotAliveException,
- TException {
- StormTopology topology = null;
+ public StormTopology getTopology(String id) throws NotAliveException,
TException {
+ StormTopology topology;
try {
- StormTopology stormtopology =
- StormConfig.read_nimbus_topology_code(conf, id);
+ StormTopology stormtopology =
StormConfig.read_nimbus_topology_code(conf, id);
if (stormtopology == null) {
throw new NotAliveException("No topology of " + id);
}
- Map<Object, Object> topologyConf =
- (Map<Object, Object>) StormConfig
- .read_nimbus_topology_conf(conf, id);
+ Map<Object, Object> topologyConf = (Map<Object, Object>)
StormConfig.read_nimbus_topology_conf(conf, id);
topology = Common.system_topology(topologyConf, stormtopology);
} catch (Exception e) {
@@ -1153,12 +1031,10 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
}
@Override
- public StormTopology getUserTopology(String id) throws NotAliveException,
- TException {
+ public StormTopology getUserTopology(String id) throws NotAliveException,
TException {
StormTopology topology = null;
try {
- StormTopology stormtopology =
- StormConfig.read_nimbus_topology_code(conf, id);
+ StormTopology stormtopology =
StormConfig.read_nimbus_topology_code(conf, id);
if (stormtopology == null) {
throw new NotAliveException("No topology of " + id);
}
@@ -1173,35 +1049,32 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
/**
* check whether the topology is bActive?
- *
+ *
* @param nimbus
* @param topologyName
* @param bActive
* @throws Exception
*/
- public void checkTopologyActive(NimbusData nimbus, String topologyName,
- boolean bActive) throws Exception {
+ public void checkTopologyActive(NimbusData nimbus, String topologyName,
boolean bActive) throws Exception {
if (isTopologyActive(nimbus.getStormClusterState(), topologyName) !=
bActive) {
if (bActive) {
throw new NotAliveException(topologyName + " is not alive");
} else {
- throw new AlreadyAliveException(topologyName
- + " is already active");
+ throw new AlreadyAliveException(topologyName + " is already
active");
}
}
}
/**
* whether the topology is active by topology name
- *
+ *
* @param stormClusterState see Cluster_clj
* @param topologyName
* @return boolean if the storm is active, return true, otherwise return
- * false;
+ * false;
* @throws Exception
*/
- public boolean isTopologyActive(StormClusterState stormClusterState,
- String topologyName) throws Exception {
+ public boolean isTopologyActive(StormClusterState stormClusterState,
String topologyName) throws Exception {
boolean rtn = false;
if (Cluster.get_topology_id(stormClusterState, topologyName) != null) {
rtn = true;
@@ -1213,7 +1086,7 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
* create local topology files /local-dir/nimbus/topologyId/stormjar.jar
* /local-dir/nimbus/topologyId/stormcode.ser
* /local-dir/nimbus/topologyId/stormconf.ser
- *
+ *
* @param conf
* @param topologyId
* @param tmpJarLocation
@@ -1221,9 +1094,8 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
* @param topology
* @throws IOException
*/
- private void setupStormCode(Map<Object, Object> conf, String topologyId,
- String tmpJarLocation, Map<Object, Object> stormConf,
- StormTopology topology) throws IOException {
+ private void setupStormCode(Map<Object, Object> conf, String topologyId,
String tmpJarLocation, Map<Object, Object> stormConf, StormTopology topology)
+ throws IOException {
// local-dir/nimbus/stormdist/topologyId
String stormroot = StormConfig.masterStormdistRoot(conf, topologyId);
@@ -1234,18 +1106,16 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
setupJar(conf, tmpJarLocation, stormroot);
// serialize to file /local-dir/nimbus/topologyId/stormcode.ser
- FileUtils.writeByteArrayToFile(
- new File(StormConfig.stormcode_path(stormroot)),
- Utils.serialize(topology));
+ FileUtils.writeByteArrayToFile(new
File(StormConfig.stormcode_path(stormroot)), Utils.serialize(topology));
// serialize to file /local-dir/nimbus/topologyId/stormconf.ser
- FileUtils.writeByteArrayToFile(
- new File(StormConfig.stormconf_path(stormroot)),
- Utils.serialize(stormConf));
+ FileUtils.writeByteArrayToFile(new
File(StormConfig.stormconf_path(stormroot)), Utils.serialize(stormConf));
+
+ // Update downloadCode timeStamp
+ StormConfig.write_nimbus_topology_timestamp(data.getConf(),
topologyId, System.currentTimeMillis());
}
- private boolean copyLibJars(String tmpJarLocation, String stormroot)
- throws IOException {
+ private boolean copyLibJars(String tmpJarLocation, String stormroot)
throws IOException {
String srcLibPath = StormConfig.stormlib_path(tmpJarLocation);
String destLibPath = StormConfig.stormlib_path(stormroot);
LOG.info("Begin to copy from " + srcLibPath + " to " + destLibPath);
@@ -1265,14 +1135,13 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
/**
* Copy jar to /local-dir/nimbus/topologyId/stormjar.jar
- *
+ *
* @param conf
* @param tmpJarLocation
* @param stormroot
* @throws IOException
*/
- private void setupJar(Map<Object, Object> conf, String tmpJarLocation,
- String stormroot) throws IOException {
+ private void setupJar(Map<Object, Object> conf, String tmpJarLocation,
String stormroot) throws IOException {
if (!StormConfig.local_mode(conf)) {
boolean existLibs = copyLibJars(tmpJarLocation, stormroot);
@@ -1287,8 +1156,7 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
if (jarPath == null) {
if (existLibs == false) {
- throw new IllegalArgumentException("No jar under "
- + tmpJarLocation);
+ throw new IllegalArgumentException("No jar under " +
tmpJarLocation);
} else {
LOG.info("No submit jar");
return;
@@ -1297,86 +1165,71 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
File srcFile = new File(jarPath);
if (!srcFile.exists()) {
- throw new IllegalArgumentException(jarPath + " to copy to "
- + stormroot + " does not exist!");
+ throw new IllegalArgumentException(jarPath + " to copy to " +
stormroot + " does not exist!");
}
String path = StormConfig.stormjar_path(stormroot);
File destFile = new File(path);
FileUtils.copyFile(srcFile, destFile);
srcFile.delete();
-
- return;
}
}
/**
* generate TaskInfo for every bolt or spout in ZK
/ZK/tasks/topoologyId/xxx
- *
+ *
* @param conf
* @param topologyId
* @param stormClusterState
* @throws Exception
*/
- public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId,
- StormClusterState stormClusterState) throws Exception {
+ public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId,
StormClusterState stormClusterState) throws Exception {
+ Map<Integer, TaskInfo> taskToTaskInfo =
mkTaskComponentAssignments(conf, topologyId);
// mkdir /ZK/taskbeats/topoologyId
- stormClusterState.setup_heartbeats(topologyId);
+ int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo);
+ TopologyTaskHbInfo topoTaskHbinfo = new TopologyTaskHbInfo(topologyId,
masterId);
+ data.getTasksHeartbeat().put(topologyId, topoTaskHbinfo);
+ stormClusterState.topology_heartbeat(topologyId, topoTaskHbinfo);
- Map<Integer, TaskInfo> taskToTaskInfo =
- mkTaskComponentAssignments(conf, topologyId);
if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) {
throw new InvalidTopologyException("Failed to generate TaskIDs
map");
}
- // key is taskid, value is taskinfo
+ // key is taskid, value is taskinfo
stormClusterState.set_task(topologyId, taskToTaskInfo);
}
/**
* generate a taskid(Integer) for every task
- *
+ *
* @param conf
* @param topologyid
* @return Map<Integer, String>: from taskid to componentid
* @throws IOException
* @throws InvalidTopologyException
*/
- public Map<Integer, TaskInfo> mkTaskComponentAssignments(
- Map<Object, Object> conf, String topologyid) throws IOException,
- InvalidTopologyException {
+ public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object,
Object> conf, String topologyid) throws IOException, InvalidTopologyException {
// @@@ here exist a little problem,
// we can directly pass stormConf from Submit method
- Map<Object, Object> stormConf =
- StormConfig.read_nimbus_topology_conf(conf, topologyid);
-
- StormTopology stopology =
- StormConfig.read_nimbus_topology_code(conf, topologyid);
-
+ Map<Object, Object> stormConf =
StormConfig.read_nimbus_topology_conf(conf, topologyid);
+ StormTopology stopology = StormConfig.read_nimbus_topology_code(conf,
topologyid);
StormTopology topology = Common.system_topology(stormConf, stopology);
-
return Common.mkTaskInfo(stormConf, topology, topologyid);
}
-
-
@Override
- public void metricMonitor(String topologyName, MonitorOptions options)
- throws NotAliveException, TException {
+ public void metricMonitor(String topologyName, MonitorOptions options)
throws TException {
boolean isEnable = options.is_isEnable();
StormClusterState clusterState = data.getStormClusterState();
try {
- String topologyId =
- Cluster.get_topology_id(clusterState, topologyName);
+ String topologyId = Cluster.get_topology_id(clusterState,
topologyName);
if (null != topologyId) {
clusterState.set_storm_monitor(topologyId, isEnable);
} else {
- throw new NotAliveException(
- "Failed to update metricsMonitor status as "
- + topologyName + " is not alive");
+ throw new NotAliveException("Failed to update metricsMonitor
status as " + topologyName + " is not alive");
}
} catch (Exception e) {
String errMsg = "Failed to update metricsMonitor " + topologyName;
@@ -1387,137 +1240,263 @@ public class ServiceHandler implements Iface,
Shutdownable, DaemonCommon {
}
@Override
- public TopologyMetric getTopologyMetric(String topologyId)
- throws NotAliveException, TException {
- LOG.debug("Nimbus service handler, getTopologyMetric, topology ID: "
- + topologyId);
+ public TopologyMetric getTopologyMetrics(String topologyId) throws
TException {
+ LOG.debug("Nimbus service handler, getTopologyMetric, topology ID: " +
topologyId);
long start = System.nanoTime();
try {
- TopologyMetric metric =
- data.getMetricRunnable().getTopologyMetric(topologyId);
-
- return metric;
- }finally {
- double spend = ( System.nanoTime()- start)/1000000.0d;;
- SimpleJStormMetric.updateHistorgram("getTopologyMetric", spend);
- LOG.info("getTopologyMetric, {}:{}", topologyId, spend);
+ return data.getMetricRunnable().getTopologyMetric(topologyId);
+ } finally {
+ long end = System.nanoTime();
+ SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", (end
- start) / TimeUtils.NS_PER_US);
}
}
@Override
- public void workerUploadMetric(WorkerUploadMetrics uploadMetrics)
- throws TException {
- // TODO Auto-generated method stub
- LOG.debug("!!!!!!! workerUploadMetric:{}:{}:{} ",
uploadMetrics.get_topology_id(),
- uploadMetrics.get_supervisor_id(), uploadMetrics.get_port());
+ public void uploadTopologyMetrics(String topologyId, TopologyMetric
uploadMetrics) throws TException {
+ LOG.info("Received topology metrics:{}", topologyId);
- TopologyMetricsRunnable.Update event =
- new TopologyMetricsRunnable.Update();
- event.workerMetrics = uploadMetrics;
+ Update event = new Update();
+ event.timestamp = System.currentTimeMillis();
+ event.topologyMetrics = uploadMetrics;
+ event.topologyId = topologyId;
data.getMetricRunnable().pushEvent(event);
}
+ @Override
+ public Map<String, Long> registerMetrics(String topologyId, Set<String>
metrics) throws TException {
+ try {
+ return data.getMetricRunnable().registerMetrics(topologyId,
metrics);
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+
public void uploadNewCredentials(String topologyName, Credentials creds) {
- // TODO Auto-generated method stub
-
}
@Override
- public NettyMetric getNettyMetric(String topologyName, int pos) throws
TException {
- // TODO Auto-generated method stub
- long start = System.nanoTime();
- try {
- String topologyId = getTopologyId(topologyName);
-
- if (pos < 0) {
- LOG.warn("Invalid pos {}, set it as 0", pos);
- pos = 0;
- }
- SortedMap<String, MetricInfo> allConnections =
data.getMetricRunnable().getNettyMetric(topologyId);
- int mapSize = allConnections.size();
-
- NettyMetric ret = new NettyMetric();
-
- ret.set_connectionNum(mapSize);
-
-
- Map<String, MetricInfo> selectConnections = new TreeMap<String,
MetricInfo>();
- ret.set_connections(selectConnections);
- int i = 0;
- int selectMapSize = 0;
- for (Entry<String, MetricInfo> entry: allConnections.entrySet()) {
- i++;
- if (i <= pos) {
- continue;
+ public List<MetricInfo> getMetrics(String topologyId, int type) throws
TException {
+ MetaType metaType = MetaType.parse(type);
+ return data.getMetricCache().getMetricData(topologyId, metaType);
+ }
+
+ @Override
+ public MetricInfo getNettyMetrics(String topologyId) throws TException {
+ List<MetricInfo> metricInfoList =
data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
+ if (metricInfoList != null && metricInfoList.size() > 0) {
+ return metricInfoList.get(0);
+ }
+ return new MetricInfo();
+ }
+
+ @Override
+ public MetricInfo getNettyMetricsByHost(String topologyId, String host)
throws TException {
+ MetricInfo ret = new MetricInfo();
+
+ List<MetricInfo> metricInfoList =
data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
+ if (metricInfoList != null && metricInfoList.size() > 0) {
+ MetricInfo metricInfo = metricInfoList.get(0);
+ for (Entry<String, Map<Integer, MetricSnapshot>> metricEntry :
metricInfo.get_metrics().entrySet()) {
+ String metricName = metricEntry.getKey();
+ Map<Integer, MetricSnapshot> data = metricEntry.getValue();
+ if (metricName.contains(host)) {
+ ret.put_to_metrics(metricName, data);
}
-
- selectConnections.put(entry.getKey(), entry.getValue());
- selectMapSize++;
- if (selectMapSize >= MetricDef.NETTY_METRICS_PACKAGE_SIZE) {
- break;
+ }
+ }
+
+ LOG.info("getNettyMetricsByHost, total size:{}",
ret.get_metrics_size());
+ return ret;
+ }
+
+ @Override
+ public int getNettyMetricSizeByHost(String topologyId, String host) throws
TException {
+ return getNettyMetricsByHost(topologyId, host).get_metrics_size();
+ }
+
+ @Override
+ public MetricInfo getPagingNettyMetrics(String topologyId, String host,
int page) throws TException {
+ MetricInfo ret = new MetricInfo();
+
+ int start = (page - 1) * MetricUtils.NETTY_METRIC_PAGE_SIZE;
+ int end = page * MetricUtils.NETTY_METRIC_PAGE_SIZE;
+ int cur = -1;
+ List<MetricInfo> metricInfoList =
data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
+ if (metricInfoList != null && metricInfoList.size() > 0) {
+ MetricInfo metricInfo = metricInfoList.get(0);
+ for (Entry<String, Map<Integer, MetricSnapshot>> metricEntry :
metricInfo.get_metrics().entrySet()) {
+ String metricName = metricEntry.getKey();
+ Map<Integer, MetricSnapshot> data = metricEntry.getValue();
+ if (metricName.contains(host)) {
+ ++cur;
+ if (cur >= start && cur < end) {
+ ret.put_to_metrics(metricName, data);
+ }
+ if (cur >= end) {
+ break;
+ }
}
}
-
- return ret;
- }finally {
- double spend = (System.nanoTime() - start)/1000000.0d;
- SimpleJStormMetric.updateHistorgram("getNettyMetric", spend );
- LOG.info("getNettyMetric, {}:{} ms", topologyName, spend);
}
+
+ LOG.info("getNettyMetricsByHost, total size:{}",
ret.get_metrics_size());
+ return ret;
}
+ @Override
+ public MetricInfo getTaskMetrics(String topologyId, String component)
throws TException {
+ List<MetricInfo> taskMetricList = getMetrics(topologyId,
MetaType.TASK.getT());
+ if (taskMetricList != null && taskMetricList.size() > 0) {
+ MetricInfo metricInfo = taskMetricList.get(0);
+ Map<String, Map<Integer, MetricSnapshot>> metrics =
metricInfo.get_metrics();
+ for (Iterator<String> itr = metrics.keySet().iterator();
itr.hasNext(); ) {
+ String metricName = itr.next();
+ String[] parts = metricName.split(MetricUtils.DELIM);
+ if (parts.length < 7 || !parts[2].equals(component)) {
+ itr.remove();
+ }
+ }
+ LOG.info("taskMetric, total size:{}",
metricInfo.get_metrics_size());
+ return metricInfo;
+ }
+ return MetricUtils.mkMetricInfo();
+ }
@Override
- public NettyMetric getServerNettyMetric(String topologyName, String
serverName) throws TException {
- // TODO Auto-generated method stub
- long start = System.nanoTime();
- try {
- String topologyId = getTopologyId(topologyName);
-
- SortedMap<String, MetricInfo> allConnections =
data.getMetricRunnable().getNettyMetric(topologyId);
- int mapSize = allConnections.size();
-
- NettyMetric ret = new NettyMetric();
-
- String serverIp = NetWorkUtils.host2Ip(serverName);
- Map<String, MetricInfo> selectConnections = new TreeMap<String,
MetricInfo>();
- for (Entry<String, MetricInfo> entry: allConnections.entrySet()) {
- if (entry.getKey().contains(serverIp)) {
- selectConnections.put(entry.getKey(), entry.getValue());
+ public List<MetricInfo> getTaskAndStreamMetrics(String topologyId, int
taskId) throws TException {
+ List<MetricInfo> taskMetricList = getMetrics(topologyId,
MetaType.TASK.getT());
+ List<MetricInfo> streamMetricList = getMetrics(topologyId,
MetaType.STREAM.getT());
+
+ String taskIdStr = taskId + "";
+ MetricInfo taskMetricInfo;
+ if (taskMetricList != null && taskMetricList.size() > 0) {
+ taskMetricInfo = taskMetricList.get(0);
+ Map<String, Map<Integer, MetricSnapshot>> metrics =
taskMetricInfo.get_metrics();
+ for (Iterator<String> itr = metrics.keySet().iterator();
itr.hasNext(); ) {
+ String metricName = itr.next();
+ String[] parts = metricName.split(MetricUtils.DELIM);
+ if (parts.length < 7 || !parts[3].equals(taskIdStr)) {
+ itr.remove();
}
-
}
- ret.set_connectionNum(selectConnections.size());
- ret.set_connections(selectConnections);
- return ret;
- }finally {
- double spend = (System.nanoTime()- start)/1000000.0d;
- SimpleJStormMetric.updateHistorgram("getNettyMetric", spend);
- LOG.info("getServerNettyMetric, {} : {}ms", topologyName, spend);
+ } else {
+ taskMetricInfo = MetricUtils.mkMetricInfo();
}
+
+ MetricInfo streamMetricInfo;
+ if (streamMetricList != null && streamMetricList.size() > 0) {
+ streamMetricInfo = streamMetricList.get(0);
+ Map<String, Map<Integer, MetricSnapshot>> metrics =
streamMetricInfo.get_metrics();
+ for (Iterator<String> itr = metrics.keySet().iterator();
itr.hasNext(); ) {
+ String metricName = itr.next();
+ String[] parts = metricName.split(MetricUtils.DELIM);
+ if (parts.length < 7 || !parts[3].equals(taskIdStr)) {
+ itr.remove();
+ }
+ }
+ } else {
+ streamMetricInfo = MetricUtils.mkMetricInfo();
+ }
+ return Lists.newArrayList(taskMetricInfo, streamMetricInfo);
}
@Override
- public void updateConf(String name, String conf) throws NotAliveException,
- InvalidTopologyException, TException {
+ public List<MetricInfo> getSummarizedTopologyMetrics(String topologyId)
throws TException {
+ return data.getMetricCache().getMetricData(topologyId,
MetaType.TOPOLOGY);
+ }
+
+ @Override
+ public void updateTopology(String name, String uploadedLocation,
+ String updateConf) throws NotAliveException,
+ InvalidTopologyException, TException {
try {
checkTopologyActive(data, name, true);
+ String topologyId = null;
+ StormClusterState stormClusterState = data.getStormClusterState();
+ topologyId = Cluster.get_topology_id(stormClusterState, name);
+ if (topologyId == null) {
+ throw new NotAliveException(name);
+ }
+ if (uploadedLocation != null) {
+ String stormroot =
+ StormConfig.masterStormdistRoot(conf, topologyId);
+
+ int lastIndexOf = uploadedLocation.lastIndexOf("/");
+ // /local-dir/nimbus/inbox/xxxx/
+ String tmpDir = uploadedLocation.substring(0, lastIndexOf);
+
+ // /local-dir/nimbus/inbox/xxxx/stormjar.jar
+ String stormJarPath = StormConfig.stormjar_path(tmpDir);
+
+ File file = new File(uploadedLocation);
+ if (file.exists()) {
+ file.renameTo(new File(stormJarPath));
+ } else {
+ throw new FileNotFoundException("Source \'"
+ + uploadedLocation + "\' does not exist");
+ }
+ // move fileDir to /local-dir/nimbus/topologyid/
+ File srcDir = new File(tmpDir);
+ File destDir = new File(stormroot);
+ try {
+ FileUtils.moveDirectory(srcDir, destDir);
+ } catch (FileExistsException e) {
+ FileUtils.copyDirectory(srcDir, destDir);
+ FileUtils.deleteQuietly(srcDir);
+ }
+ // Update downloadCode timeStamp
+
StormConfig.write_nimbus_topology_timestamp(data.getConf(), topologyId,
System.currentTimeMillis());
+ LOG.info("update jar of " + name + " successfully");
+ }
+
+ Map topoConf =
StormConfig.read_nimbus_topology_conf(data.getConf(),
+ topologyId);
Map<Object, Object> config =
- (Map<Object, Object>) JStormUtils.from_json(conf);
+ (Map<Object, Object>) JStormUtils.from_json(updateConf);
+ topoConf.putAll(config);
+ StormConfig.write_nimbus_topology_conf(data.getConf(), topologyId,
+ topoConf);
+ LOG.info("update topology " + name + " successfully");
NimbusUtils.transitionName(data, name, true,
- StatusType.update_conf, config);
+ StatusType.update_topology, config);
+
} catch (NotAliveException e) {
- String errMsg = "Rebalance Error, no this topology " + name;
+ String errMsg = "Error, no this topology " + name;
LOG.error(errMsg, e);
throw new NotAliveException(errMsg);
} catch (Exception e) {
- String errMsg = "Failed to rebalance topology " + name;
+ String errMsg = "Failed to update topology " + name;
LOG.error(errMsg, e);
throw new TException(errMsg);
}
+
+ }
+
+ @Override
+ public void updateTaskHeartbeat(TopologyTaskHbInfo taskHbs) throws
TException {
+ String topologyId = taskHbs.get_topologyId();
+ Integer topologyMasterId = taskHbs.get_topologyMasterId();
+ TopologyTaskHbInfo nimbusTaskHbs =
data.getTasksHeartbeat().get(topologyId);
+
+ if (nimbusTaskHbs == null) {
+ nimbusTaskHbs = new TopologyTaskHbInfo(topologyId,
topologyMasterId);
+ data.getTasksHeartbeat().put(topologyId, nimbusTaskHbs);
+ }
+
+ Map<Integer, TaskHeartbeat> nimbusTaskHbMap =
nimbusTaskHbs.get_taskHbs();
+ if (nimbusTaskHbMap == null) {
+ nimbusTaskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
+ nimbusTaskHbs.set_taskHbs(nimbusTaskHbMap);
+ }
+ Map<Integer, TaskHeartbeat> taskHbMap = taskHbs.get_taskHbs();
+ if (taskHbMap != null) {
+ for (Entry<Integer, TaskHeartbeat> entry : taskHbMap.entrySet()) {
+ nimbusTaskHbMap.put(entry.getKey(), entry.getValue());
+ }
+ }
}
}