http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java index 02b574f..3871074 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java @@ -17,36 +17,37 @@ */ package com.alibaba.jstorm.daemon.nimbus; -import java.io.IOException; -import java.nio.channels.Channel; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import backtype.storm.Config; +import backtype.storm.generated.TopologyTaskHbInfo; +import backtype.storm.scheduler.INimbus; +import backtype.storm.utils.BufferFileInputStream; +import backtype.storm.utils.TimeCacheMap; import com.alibaba.jstorm.cache.JStormCache; +import com.alibaba.jstorm.callback.AsyncLoopThread; +import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.Cluster; import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.cluster.StormZkClusterState; +import com.alibaba.jstorm.metric.JStormMetricCache; +import com.alibaba.jstorm.metric.JStormMetricsReporter; import com.alibaba.jstorm.task.TkHbCacheTime; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.TimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import backtype.storm.Config; -import backtype.storm.scheduler.INimbus; -import backtype.storm.utils.BufferFileInputStream; -import backtype.storm.utils.TimeCacheMap; +import java.io.IOException; +import java.nio.channels.Channel; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * All nimbus data - * */ public class NimbusData { private static final Logger LOG = LoggerFactory.getLogger(NimbusData.class); @@ -62,7 +63,7 @@ public class NimbusData { private TimeCacheMap<Object, Object> downloaders; private TimeCacheMap<Object, Object> uploaders; // cache thrift response to avoid scan zk too frequently - private NimbusCache cache; + private NimbusCache nimbusCache; private int startTime; @@ -82,17 +83,24 @@ public class NimbusData { private AtomicBoolean isShutdown = new AtomicBoolean(false); - private final TopologyMetricsRunnable metricRunnable; + private TopologyMetricsRunnable metricRunnable; + private AsyncLoopThread metricLoopThread; // The topologys which has been submitted, but the assignment is not // finished private TimeCacheMap<String, Object> pendingSubmitTopologys; - private Map<String, Integer> topologyTaskTimeout; - - private TopologyNettyMgr topologyNettyMgr ; - @SuppressWarnings({ "unchecked", "rawtypes" }) + // Map<TopologyId, TasksHeartbeat> + private Map<String, TopologyTaskHbInfo> tasksHeartbeat; + + private final JStormMetricCache metricCache; + + private final String clusterName; + + private JStormMetricsReporter metricsReporter; + + @SuppressWarnings({"unchecked", "rawtypes"}) public NimbusData(Map conf, INimbus inimbus) throws Exception { this.conf = conf; @@ -104,8 +112,7 @@ public class NimbusData { createCache(); - this.taskHeartbeatsCache = - new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>(); + this.taskHeartbeatsCache = new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>(); this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM); @@ -117,66 +124,63 @@ public class NimbusData { localMode = StormConfig.local_mode(conf); - this.topologyNettyMgr = new TopologyNettyMgr(conf); + this.metricCache = new JStormMetricCache(conf, this.stormClusterState); + this.clusterName = ConfigExtension.getClusterName(conf); + this.metricRunnable = new TopologyMetricsRunnable(this); + this.metricRunnable.init(); - pendingSubmitTopologys = - new TimeCacheMap<String, Object>(JStormUtils.MIN_30); - + pendingSubmitTopologys = new TimeCacheMap<String, Object>(JStormUtils.MIN_30); topologyTaskTimeout = new ConcurrentHashMap<String, Integer>(); + tasksHeartbeat = new ConcurrentHashMap<String, TopologyTaskHbInfo>(); + + if (!localMode) { + startMetricThreads(); + } } - /** - * Just for test - */ - public NimbusData() { - scheduExec = Executors.newScheduledThreadPool(6); + public void startMetricThreads() { + this.metricRunnable.start(); - inimubs = null; - conf = new HashMap<Object, Object>(); - localMode = false; - this.metricRunnable = new TopologyMetricsRunnable(this); + // init nimbus metric reporter + this.metricsReporter = new JStormMetricsReporter(this); + this.metricsReporter.init(); } public void createFileHandler() { - TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback = - new TimeCacheMap.ExpiredCallback<Object, Object>() { - @Override - public void expire(Object key, Object val) { - try { - LOG.info("Close file " + String.valueOf(key)); - if (val != null) { - if (val instanceof Channel) { - Channel channel = (Channel) val; - channel.close(); - } else if (val instanceof BufferFileInputStream) { - BufferFileInputStream is = - (BufferFileInputStream) val; - is.close(); - } - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); + TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback = new TimeCacheMap.ExpiredCallback<Object, Object>() { + @Override + public void expire(Object key, Object val) { + try { + LOG.info("Close file " + String.valueOf(key)); + if (val != null) { + if (val instanceof Channel) { + Channel channel = (Channel) val; + channel.close(); + } else if (val instanceof BufferFileInputStream) { + BufferFileInputStream is = (BufferFileInputStream) val; + is.close(); } - } - }; + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + + } + }; - int file_copy_expiration_secs = - JStormUtils.parseInt( - conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30); - uploaders = - new TimeCacheMap<Object, Object>(file_copy_expiration_secs, - expiredCallback); - downloaders = - new TimeCacheMap<Object, Object>(file_copy_expiration_secs, - expiredCallback); + int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30); + uploaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback); + downloaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback); } public void createCache() throws IOException { - cache = new NimbusCache(conf, stormClusterState); - - ((StormZkClusterState) stormClusterState).setCache(cache.getMemCache()); + nimbusCache = new NimbusCache(conf, stormClusterState); + ((StormZkClusterState) stormClusterState).setCache(nimbusCache.getMemCache()); + } + + public String getClusterName() { + return clusterName; } public int uptime() { @@ -203,8 +207,7 @@ public class NimbusData { return taskHeartbeatsCache; } - public Map<Integer, TkHbCacheTime> getTaskHeartbeatsCache( - String topologyId, boolean createIfNotExist) { + public Map<Integer, TkHbCacheTime> getTaskHeartbeatsCache(String topologyId, boolean createIfNotExist) { Map<Integer, TkHbCacheTime> ret = null; ret = taskHeartbeatsCache.get(topologyId); if (ret == null && createIfNotExist) { @@ -214,8 +217,7 @@ public class NimbusData { return ret; } - public void setTaskHeartbeatsCache( - ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache) { + public void setTaskHeartbeatsCache(ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache) { this.taskHeartbeatsCache = taskHeartbeatsCache; } @@ -256,7 +258,7 @@ public class NimbusData { } public void cleanup() { - cache.cleanup(); + nimbusCache.cleanup(); LOG.info("Successfully shutdown Cache"); try { stormClusterState.disconnect(); @@ -296,15 +298,19 @@ public class NimbusData { } public JStormCache getMemCache() { - return cache.getMemCache(); + return nimbusCache.getMemCache(); } - + public JStormCache getDbCache() { - return cache.getDbCache(); + return nimbusCache.getDbCache(); } - + public NimbusCache getNimbusCache() { - return cache; + return nimbusCache; + } + + public JStormMetricCache getMetricCache() { + return metricCache; } public final TopologyMetricsRunnable getMetricRunnable() { @@ -319,9 +325,7 @@ public class NimbusData { return topologyTaskTimeout; } - public TopologyNettyMgr getTopologyNettyMgr() { - return topologyNettyMgr; - } - - + public Map<String, TopologyTaskHbInfo> getTasksHeartbeat() { + return tasksHeartbeat; + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java index b22088e..5d5e18c 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java @@ -17,59 +17,51 @@ */ package com.alibaba.jstorm.daemon.nimbus; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.server.THsHaServer; -import org.apache.thrift.transport.TNonblockingServerSocket; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.generated.Nimbus; import backtype.storm.generated.Nimbus.Iface; import backtype.storm.scheduler.INimbus; import backtype.storm.utils.Utils; - import com.alibaba.jstorm.callback.AsyncLoopRunnable; import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.daemon.supervisor.Httpserver; import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb; -import com.alibaba.jstorm.metric.SimpleJStormMetric; import com.alibaba.jstorm.schedule.CleanRunnable; import com.alibaba.jstorm.schedule.FollowerRunnable; import com.alibaba.jstorm.schedule.MonitorRunnable; import com.alibaba.jstorm.utils.JStormServerUtils; import com.alibaba.jstorm.utils.JStormUtils; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.server.THsHaServer; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * - * NimbusServer work flow: 1. cleanup interrupted topology delete - * /storm-local-dir/nimbus/topologyid/stormdis delete - * /storm-zk-root/storms/topologyid + * NimbusServer work flow: 1. cleanup interrupted topology delete /storm-local-dir/nimbus/topologyid/stormdis delete /storm-zk-root/storms/topologyid * * 2. set /storm-zk-root/storms/topology stats as run * - * 3. start one thread, every nimbus.monitor.reeq.secs set - * /storm-zk-root/storms/ all topology as monitor. when the topology's status is - * monitor, nimubs would reassign workers 4. start one threa, every - * nimubs.cleanup.inbox.freq.secs cleanup useless jar + * 3. start one thread, every nimbus.monitor.reeq.secs set /storm-zk-root/storms/ all topology as monitor. when the topology's status is monitor, nimubs would + * reassign workers 4. start one threa, every nimubs.cleanup.inbox.freq.secs cleanup useless jar * * @author version 1: Nathan Marz version 2: Lixin/Chenjun version 3: Longda * */ public class NimbusServer { - private static final Logger LOG = LoggerFactory - .getLogger(NimbusServer.class); + private static final Logger LOG = LoggerFactory.getLogger(NimbusServer.class); private NimbusData data; @@ -83,8 +75,7 @@ public class NimbusServer { private Httpserver hs; - private List<AsyncLoopThread> smartThreads = - new ArrayList<AsyncLoopThread>(); + private List<AsyncLoopThread> smartThreads = new ArrayList<AsyncLoopThread>(); public static void main(String[] args) throws Exception { // read configuration files @@ -134,8 +125,6 @@ public class NimbusServer { while (!data.isLeader()) Utils.sleep(5000); - initUploadMetricThread(data); - init(conf); } catch (Throwable e) { LOG.error("Fail to run nimbus ", e); @@ -146,8 +135,7 @@ public class NimbusServer { LOG.info("Quit nimbus"); } - public ServiceHandler launcherLocalServer(final Map conf, INimbus inimbus) - throws Exception { + public ServiceHandler launcherLocalServer(final Map conf, INimbus inimbus) throws Exception { LOG.info("Begin to start nimbus on local model"); StormConfig.validate_local_mode(conf); @@ -156,9 +144,6 @@ public class NimbusServer { data = createNimbusData(conf, inimbus); - // @@@ testing - initUploadMetricThread(data); - init(conf); return serviceHandler; @@ -184,6 +169,8 @@ public class NimbusServer { serviceHandler = new ServiceHandler(data); if (!data.isLocalMode()) { + + //data.startMetricThreads(); initMonitor(conf); @@ -193,15 +180,11 @@ public class NimbusServer { } @SuppressWarnings("rawtypes") - private NimbusData createNimbusData(Map conf, INimbus inimbus) - throws Exception { + private NimbusData createNimbusData(Map conf, INimbus inimbus) throws Exception { // Callback callback=new TimerCallBack(); // StormTimer timer=Timer.mkTimerTimer(callback); - NimbusData data = new NimbusData(conf, inimbus); - - return data; - + return new NimbusData(conf, inimbus); } private void initTopologyAssign() { @@ -218,9 +201,9 @@ public class NimbusServer { for (String topologyid : active_ids) { // set the topology status as startup // in fact, startup won't change anything - NimbusUtils.transition(data, topologyid, false, - StatusType.startup); + NimbusUtils.transition(data, topologyid, false, StatusType.startup); NimbusUtils.updateTopologyTaskTimeout(data, topologyid); + NimbusUtils.updateTopologyTaskHb(data, topologyid); } } @@ -235,20 +218,15 @@ public class NimbusServer { // Schedule Nimbus monitor MonitorRunnable r1 = new MonitorRunnable(data); - int monitor_freq_secs = - JStormUtils.parseInt(conf.get(Config.NIMBUS_MONITOR_FREQ_SECS), - 10); - scheduExec.scheduleAtFixedRate(r1, 0, monitor_freq_secs, - TimeUnit.SECONDS); + int monitor_freq_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_MONITOR_FREQ_SECS), 10); + scheduExec.scheduleAtFixedRate(r1, 0, monitor_freq_secs, TimeUnit.SECONDS); LOG.info("Successfully init Monitor thread"); } /** - * Right now, every 600 seconds, nimbus will clean jar under - * /LOCAL-DIR/nimbus/inbox, which is the uploading topology directory + * Right now, every 600 seconds, nimbus will clean jar under /LOCAL-DIR/nimbus/inbox, which is the uploading topology directory * - * @param conf * @throws IOException */ @SuppressWarnings("rawtypes") @@ -257,39 +235,25 @@ public class NimbusServer { // Schedule Nimbus inbox cleaner/nimbus/inbox jar String dir_location = StormConfig.masterInbox(conf); - int inbox_jar_expiration_secs = - JStormUtils - .parseInt(conf - .get(Config.NIMBUS_INBOX_JAR_EXPIRATION_SECS), - 3600); - CleanRunnable r2 = - new CleanRunnable(dir_location, inbox_jar_expiration_secs); + int inbox_jar_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_INBOX_JAR_EXPIRATION_SECS), 3600); + CleanRunnable r2 = new CleanRunnable(dir_location, inbox_jar_expiration_secs); - int cleanup_inbox_freq_secs = - JStormUtils.parseInt( - conf.get(Config.NIMBUS_CLEANUP_INBOX_FREQ_SECS), 600); - - scheduExec.scheduleAtFixedRate(r2, 0, cleanup_inbox_freq_secs, - TimeUnit.SECONDS); + int cleanup_inbox_freq_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_CLEANUP_INBOX_FREQ_SECS), 600); + scheduExec.scheduleAtFixedRate(r2, 0, cleanup_inbox_freq_secs, TimeUnit.SECONDS); LOG.info("Successfully init " + dir_location + " cleaner"); } @SuppressWarnings("rawtypes") private void initThrift(Map conf) throws TTransportException { - Integer thrift_port = - JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT)); - TNonblockingServerSocket socket = - new TNonblockingServerSocket(thrift_port); + Integer thrift_port = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT)); + TNonblockingServerSocket socket = new TNonblockingServerSocket(thrift_port); - Integer maxReadBufSize = - JStormUtils.parseInt(conf - .get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE)); + Integer maxReadBufSize = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE)); THsHaServer.Args args = new THsHaServer.Args(socket); args.workerThreads(ServiceHandler.THREAD_NUM); - args.protocolFactory(new TBinaryProtocol.Factory(false, true, - maxReadBufSize, -1)); + args.protocolFactory(new TBinaryProtocol.Factory(false, true, maxReadBufSize, -1)); args.processor(new Nimbus.Processor<Iface>(serviceHandler)); args.maxReadBufferBytes = maxReadBufSize; @@ -317,53 +281,15 @@ public class NimbusServer { }); } - private void initUploadMetricThread(NimbusData data) { - final TopologyMetricsRunnable metricRunnable = data.getMetricRunnable(); - - int threadNum = ConfigExtension.getNimbusMetricThreadNum(data.getConf()); - - for (int i = 0; i < threadNum; i++) { - AsyncLoopThread thread = new AsyncLoopThread(metricRunnable); - smartThreads.add(thread); - } - - Runnable pusher = new Runnable() { - - @Override - public void run() { - // TODO Auto-generated method stub - TopologyMetricsRunnable.Upload event = - new TopologyMetricsRunnable.Upload(); - event.timeStamp = System.currentTimeMillis(); - - metricRunnable.pushEvent(event); - } - - }; - - ScheduledExecutorService scheduleService = data.getScheduExec(); - scheduleService.scheduleAtFixedRate(pusher, 120, 60, - TimeUnit.SECONDS); - - SimpleJStormMetric nimbusMetric = SimpleJStormMetric.mkInstance(); - scheduleService.scheduleAtFixedRate(nimbusMetric, 120, 60, - TimeUnit.SECONDS); - - //AsyncLoopThread nimbusCacheThread = new AsyncLoopThread(data.getNimbusCache().getCacheRunnable()); - //smartThreads.add(nimbusCacheThread); - - LOG.info("Successfully init metrics uploading thread"); - } - public void cleanup() { - if (data.getIsShutdown().getAndSet(true) == true) { + if (data.getIsShutdown().getAndSet(true)) { LOG.info("Notify to quit nimbus"); return; } LOG.info("Begin to shutdown nimbus"); AsyncLoopRunnable.getShutdown().set(true); - + data.getScheduExec().shutdownNow(); for (AsyncLoopThread t : smartThreads) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java index 7181e77..4e032e3 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java @@ -17,53 +17,31 @@ */ package com.alibaba.jstorm.daemon.nimbus; -import java.io.BufferedReader; -import java.io.Closeable; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.security.InvalidParameterException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; -import backtype.storm.generated.Bolt; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.NimbusStat; -import backtype.storm.generated.NimbusSummary; -import backtype.storm.generated.NotAliveException; -import backtype.storm.generated.SpoutSpec; -import backtype.storm.generated.StateSpoutSpec; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.SupervisorSummary; -import backtype.storm.generated.TopologySummary; +import backtype.storm.generated.*; import backtype.storm.utils.ThriftTopologyUtils; import backtype.storm.utils.Utils; - import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.Cluster; +import com.alibaba.jstorm.cluster.Common; import com.alibaba.jstorm.cluster.StormBase; import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo; import com.alibaba.jstorm.schedule.Assignment; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; +import com.alibaba.jstorm.task.TaskInfo; import com.alibaba.jstorm.task.TkHbCacheTime; -import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.PathUtils; import com.alibaba.jstorm.utils.TimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.InvalidParameterException; +import java.util.*; +import java.util.Map.Entry; public class NimbusUtils { @@ -71,7 +49,6 @@ public class NimbusUtils { /** * add coustom KRYO serialization - * */ private static Map mapifySerializations(List sers) { Map rtn = new HashMap(); @@ -92,8 +69,6 @@ public class NimbusUtils { /** * Normalize stormConf * - * - * * @param conf * @param stormConf * @param topology @@ -101,8 +76,7 @@ public class NimbusUtils { * @throws Exception */ @SuppressWarnings("rawtypes") - public static Map normalizeConf(Map conf, Map stormConf, - StormTopology topology) throws Exception { + public static Map normalizeConf(Map conf, Map stormConf, StormTopology topology) throws Exception { List kryoRegisterList = new ArrayList(); List kryoDecoratorList = new ArrayList(); @@ -113,18 +87,14 @@ public class NimbusUtils { Object totalRegister = totalConf.get(Config.TOPOLOGY_KRYO_REGISTER); if (totalRegister != null) { - LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) - + ", TOPOLOGY_KRYO_REGISTER" - + totalRegister.getClass().getName()); + LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", TOPOLOGY_KRYO_REGISTER" + totalRegister.getClass().getName()); JStormUtils.mergeList(kryoRegisterList, totalRegister); } Object totalDecorator = totalConf.get(Config.TOPOLOGY_KRYO_DECORATORS); if (totalDecorator != null) { - LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) - + ", TOPOLOGY_KRYO_DECORATOR" - + totalDecorator.getClass().getName()); + LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", TOPOLOGY_KRYO_DECORATOR" + totalDecorator.getClass().getName()); JStormUtils.mergeList(kryoDecoratorList, totalDecorator); } @@ -132,9 +102,7 @@ public class NimbusUtils { for (Iterator it = cids.iterator(); it.hasNext();) { String componentId = (String) it.next(); - ComponentCommon common = - ThriftTopologyUtils.getComponentCommon(topology, - componentId); + ComponentCommon common = ThriftTopologyUtils.getComponentCommon(topology, componentId); String json = common.get_json_conf(); if (json == null) { continue; @@ -150,24 +118,18 @@ public class NimbusUtils { throw new Exception(sb.toString()); } - Object componentKryoRegister = - mtmp.get(Config.TOPOLOGY_KRYO_REGISTER); + Object componentKryoRegister = mtmp.get(Config.TOPOLOGY_KRYO_REGISTER); if (componentKryoRegister != null) { - LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) - + ", componentId:" + componentId - + ", TOPOLOGY_KRYO_REGISTER" + LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", componentId:" + componentId + ", TOPOLOGY_KRYO_REGISTER" + componentKryoRegister.getClass().getName()); JStormUtils.mergeList(kryoRegisterList, componentKryoRegister); } - Object componentDecorator = - mtmp.get(Config.TOPOLOGY_KRYO_DECORATORS); + Object componentDecorator = mtmp.get(Config.TOPOLOGY_KRYO_DECORATORS); if (componentDecorator != null) { - LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) - + ", componentId:" + componentId - + ", TOPOLOGY_KRYO_DECORATOR" + LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", componentId:" + componentId + ", TOPOLOGY_KRYO_DECORATOR" + componentDecorator.getClass().getName()); JStormUtils.mergeList(kryoDecoratorList, componentDecorator); } @@ -177,25 +139,23 @@ public class NimbusUtils { Map kryoRegisterMap = mapifySerializations(kryoRegisterList); List decoratorList = JStormUtils.distinctList(kryoDecoratorList); - Integer ackerNum = - JStormUtils.parseInt(totalConf - .get(Config.TOPOLOGY_ACKER_EXECUTORS)); + Integer ackerNum = JStormUtils.parseInt(totalConf.get(Config.TOPOLOGY_ACKER_EXECUTORS)); if (ackerNum == null) { ackerNum = Integer.valueOf(1); } Map rtn = new HashMap(); + //ensure to be cluster_mode + rtn.put(Config.STORM_CLUSTER_MODE, conf.get(Config.STORM_CLUSTER_MODE)); rtn.putAll(stormConf); - rtn.put(Config.TOPOLOGY_KRYO_DECORATORS, decoratorList); + rtn.put(Config.TOPOLOGY_KRYO_DECORATORS, decoratorList); rtn.put(Config.TOPOLOGY_KRYO_REGISTER, kryoRegisterMap); rtn.put(Config.TOPOLOGY_ACKER_EXECUTORS, ackerNum); - rtn.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, - totalConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM)); + rtn.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, totalConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM)); return rtn; } - public static Integer componentParalism(Map stormConf, - ComponentCommon common) { + public static Integer componentParalism(Map stormConf, ComponentCommon common) { Map mergeMap = new HashMap(); mergeMap.putAll(stormConf); @@ -223,8 +183,7 @@ public class NimbusUtils { // } // } - Object maxTaskParalismObject = - mergeMap.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM); + Object maxTaskParalismObject = mergeMap.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM); if (maxTaskParalismObject == null) { return taskNum; } else { @@ -239,24 +198,19 @@ public class NimbusUtils { * finalize component's task paralism * * @param topology - * @param fromConf means if the paralism is read from conf file instead of - * reading from topology code + * @param fromConf means if the paralism is read from conf file instead of reading from topology code * @return */ - public static StormTopology normalizeTopology(Map stormConf, - StormTopology topology, boolean fromConf) { + public static StormTopology normalizeTopology(Map stormConf, StormTopology topology, boolean fromConf) { StormTopology ret = topology.deepCopy(); - Map<String, Object> rawComponents = - ThriftTopologyUtils.getComponents(topology); + Map<String, Object> rawComponents = ThriftTopologyUtils.getComponents(topology); Map<String, Object> components = ThriftTopologyUtils.getComponents(ret); if (rawComponents.keySet().equals(components.keySet()) == false) { - String errMsg = - "Failed to normalize topology binary, maybe due to wrong dependency"; - LOG.info(errMsg + " raw components:" + rawComponents.keySet() - + ", normalized " + components.keySet()); + String errMsg = "Failed to normalize topology binary, maybe due to wrong dependency"; + LOG.info(errMsg + " raw components:" + rawComponents.keySet() + ", normalized " + components.keySet()); throw new InvalidParameterException(errMsg); } @@ -269,9 +223,7 @@ public class NimbusUtils { if (component instanceof Bolt) { common = ((Bolt) component).get_common(); if (fromConf) { - Integer paraNum = - ConfigExtension.getBoltParallelism(stormConf, - componentName); + Integer paraNum = ConfigExtension.getBoltParallelism(stormConf, componentName); if (paraNum != null) { LOG.info("Set " + componentName + " as " + paraNum); common.set_parallelism_hint(paraNum); @@ -281,9 +233,7 @@ public class NimbusUtils { if (component instanceof SpoutSpec) { common = ((SpoutSpec) component).get_common(); if (fromConf) { - Integer paraNum = - ConfigExtension.getSpoutParallelism(stormConf, - componentName); + Integer paraNum = ConfigExtension.getSpoutParallelism(stormConf, componentName); if (paraNum != null) { LOG.info("Set " + componentName + " as " + paraNum); common.set_parallelism_hint(paraNum); @@ -293,9 +243,7 @@ public class NimbusUtils { if (component instanceof StateSpoutSpec) { common = ((StateSpoutSpec) component).get_common(); if (fromConf) { - Integer paraNum = - ConfigExtension.getSpoutParallelism(stormConf, - componentName); + Integer paraNum = ConfigExtension.getSpoutParallelism(stormConf, componentName); if (paraNum != null) { LOG.info("Set " + componentName + " as " + paraNum); common.set_parallelism_hint(paraNum); @@ -307,8 +255,7 @@ public class NimbusUtils { String jsonConfString = common.get_json_conf(); if (jsonConfString != null) { - componentMap - .putAll((Map) JStormUtils.from_json(jsonConfString)); + componentMap.putAll((Map) JStormUtils.from_json(jsonConfString)); } Integer taskNum = componentParalism(stormConf, common); @@ -328,20 +275,16 @@ public class NimbusUtils { * clean the topology which is in ZK but not in local dir * * @throws Exception - * */ - public static void cleanupCorruptTopologies(NimbusData data) - throws Exception { + public static void cleanupCorruptTopologies(NimbusData data) throws Exception { StormClusterState stormClusterState = data.getStormClusterState(); // get /local-storm-dir/nimbus/stormdist path - String master_stormdist_root = - StormConfig.masterStormdistRoot(data.getConf()); + String master_stormdist_root = StormConfig.masterStormdistRoot(data.getConf()); // listdir /local-storm-dir/nimbus/stormdist - List<String> code_ids = - PathUtils.read_dir_contents(master_stormdist_root); + List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root); // get topology in ZK /storms List<String> active_ids = data.getStormClusterState().active_storms(); @@ -352,9 +295,7 @@ public class NimbusUtils { } for (String corrupt : active_ids) { - LOG.info("Corrupt topology " - + corrupt - + " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up..."); + LOG.info("Corrupt topology " + corrupt + " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up..."); /** * Just removing the /STORMS is enough @@ -368,53 +309,47 @@ public class NimbusUtils { } - public static boolean isTaskDead(NimbusData data, String topologyId, - Integer taskId) { + public static boolean isTaskDead(NimbusData data, String topologyId, Integer taskId) { String idStr = " topology:" + topologyId + ",taskid:" + taskId; - Integer zkReportTime = null; + TopologyTaskHbInfo topoTasksHbInfo = data.getTasksHeartbeat().get(topologyId); + Map<Integer, TaskHeartbeat> taskHbMap = null; + Integer taskReportTime = null; - StormClusterState stormClusterState = data.getStormClusterState(); - TaskHeartbeat zkTaskHeartbeat = null; - try { - zkTaskHeartbeat = - stormClusterState.task_heartbeat(topologyId, taskId); - if (zkTaskHeartbeat != null) { - zkReportTime = zkTaskHeartbeat.getTimeSecs(); + if (topoTasksHbInfo != null) { + taskHbMap = topoTasksHbInfo.get_taskHbs(); + if (taskHbMap != null) { + TaskHeartbeat tHb = taskHbMap.get(taskId); + taskReportTime = ((tHb != null ) ? tHb.get_time() : null); } - } catch (Exception e) { - LOG.error("Failed to get ZK task hearbeat " + idStr, e); } - Map<Integer, TkHbCacheTime> taskHBs = - data.getTaskHeartbeatsCache(topologyId, true); + Map<Integer, TkHbCacheTime> taskHBs = data.getTaskHeartbeatsCache(topologyId, true); TkHbCacheTime taskHB = taskHBs.get(taskId); if (taskHB == null) { LOG.info("No task heartbeat cache " + idStr); - if (zkTaskHeartbeat == null) { - LOG.info("No ZK task hearbeat " + idStr); + if (topoTasksHbInfo == null || taskHbMap == null) { + LOG.info("No task hearbeat was reported for " + idStr); return true; } taskHB = new TkHbCacheTime(); - taskHB.update(zkTaskHeartbeat); + taskHB.update(taskHbMap.get(taskId)); taskHBs.put(taskId, taskHB); return false; } - if (zkReportTime == null) { - LOG.debug("No ZK task heartbeat " + idStr); + if (taskReportTime == null || taskReportTime < taskHB.getTaskAssignedTime()) { + LOG.debug("No task heartbeat was reported for " + idStr); // Task hasn't finish init int nowSecs = TimeUtils.current_time_secs(); int assignSecs = taskHB.getTaskAssignedTime(); - int waitInitTimeout = - JStormUtils.parseInt(data.getConf().get( - Config.NIMBUS_TASK_LAUNCH_SECS)); + int waitInitTimeout = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_LAUNCH_SECS)); if (nowSecs - assignSecs > waitInitTimeout) { LOG.info(idStr + " failed to init "); @@ -433,30 +368,29 @@ public class NimbusUtils { int nowSecs = TimeUtils.current_time_secs(); if (nimbusTime == 0) { // taskHB no entry, first time - // update taskHB + // update taskHBtaskReportTime taskHB.setNimbusTime(nowSecs); - taskHB.setTaskReportedTime(zkReportTime); + taskHB.setTaskReportedTime(taskReportTime); LOG.info("Update taskheartbeat to nimbus cache " + idStr); return false; } - if (reportTime != zkReportTime.intValue()) { + if (reportTime != taskReportTime.intValue()) { // zk has been updated the report time taskHB.setNimbusTime(nowSecs); - taskHB.setTaskReportedTime(zkReportTime); + taskHB.setTaskReportedTime(taskReportTime); - LOG.debug(idStr + ",nimbusTime " + nowSecs + ",zkReport:" - + zkReportTime + ",report:" + reportTime); + LOG.debug(idStr + ",nimbusTime " + nowSecs + ",zkReport:" + taskReportTime + ",report:" + reportTime); return false; } // the following is (zkReportTime == reportTime) Integer taskHBTimeout = data.getTopologyTaskTimeout().get(topologyId); if (taskHBTimeout == null) - taskHBTimeout = - JStormUtils.parseInt(data.getConf().get( - Config.NIMBUS_TASK_TIMEOUT_SECS)); + taskHBTimeout = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_TIMEOUT_SECS)); + if (taskId == topoTasksHbInfo.get_topologyMasterId()) + taskHBTimeout = (taskHBTimeout / 2); if (nowSecs - nimbusTime > taskHBTimeout) { // task is dead long ts = ((long) nimbusTime) * 1000; @@ -470,7 +404,7 @@ public class NimbusUtils { sb.append(",current "); sb.append(nowSecs); sb.append(":").append(new Date(((long) nowSecs) * 1000)); - LOG.info(sb.toString()); + LOG.debug(sb.toString()); return true; } @@ -478,13 +412,10 @@ public class NimbusUtils { } - public static void updateTaskHbStartTime(NimbusData data, - Assignment assignment, String topologyId) { - Map<Integer, TkHbCacheTime> taskHBs = - data.getTaskHeartbeatsCache(topologyId, true); + public static void updateTaskHbStartTime(NimbusData data, Assignment assignment, String topologyId) { + Map<Integer, TkHbCacheTime> taskHBs = data.getTaskHeartbeatsCache(topologyId, true); - Map<Integer, Integer> taskStartTimes = - assignment.getTaskStartTimeSecs(); + Map<Integer, Integer> taskStartTimes = assignment.getTaskStartTimeSecs(); for (Entry<Integer, Integer> entry : taskStartTimes.entrySet()) { Integer taskId = entry.getKey(); Integer taskStartTime = entry.getValue(); @@ -501,25 +432,19 @@ public class NimbusUtils { return; } - public static <T> void transitionName(NimbusData data, String topologyName, - boolean errorOnNoTransition, StatusType transition_status, - T... args) throws Exception { + public static <T> void transitionName(NimbusData data, String topologyName, boolean errorOnNoTransition, StatusType transition_status, T... args) + throws Exception { StormClusterState stormClusterState = data.getStormClusterState(); - String topologyId = - Cluster.get_topology_id(stormClusterState, topologyName); + String topologyId = Cluster.get_topology_id(stormClusterState, topologyName); if (topologyId == null) { throw new NotAliveException(topologyName); } - transition(data, topologyId, errorOnNoTransition, transition_status, - args); + transition(data, topologyId, errorOnNoTransition, transition_status, args); } - public static <T> void transition(NimbusData data, String topologyid, - boolean errorOnNoTransition, StatusType transition_status, - T... args) { + public static <T> void transition(NimbusData data, String topologyid, boolean errorOnNoTransition, StatusType transition_status, T... args) { try { - data.getStatusTransition().transition(topologyid, - errorOnNoTransition, transition_status, args); + data.getStatusTransition().transition(topologyid, errorOnNoTransition, transition_status, args); } catch (Exception e) { // TODO Auto-generated catch block LOG.error("Failed to do status transition,", e); @@ -535,22 +460,17 @@ public class NimbusUtils { return numTasks; } - public static List<TopologySummary> getTopologySummary( - StormClusterState stormClusterState, - Map<String, Assignment> assignments) throws Exception { - List<TopologySummary> topologySummaries = - new ArrayList<TopologySummary>(); + public static List<TopologySummary> getTopologySummary(StormClusterState stormClusterState, Map<String, Assignment> assignments) throws Exception { + List<TopologySummary> topologySummaries = new ArrayList<TopologySummary>(); // get all active topology's StormBase - Map<String, StormBase> bases = - Cluster.get_all_StormBase(stormClusterState); + Map<String, StormBase> bases = Cluster.get_all_StormBase(stormClusterState); for (Entry<String, StormBase> entry : bases.entrySet()) { String topologyId = entry.getKey(); StormBase base = entry.getValue(); - Assignment assignment = - stormClusterState.assignment_info(topologyId, null); + Assignment assignment = stormClusterState.assignment_info(topologyId, null); if (assignment == null) { LOG.error("Failed to get assignment of " + topologyId); continue; @@ -571,11 +491,10 @@ public class NimbusUtils { topology.set_id(topologyId); topology.set_name(base.getStormName()); topology.set_status(base.getStatusString()); - topology.set_uptime_secs(TimeUtils.time_delta(base - .getLanchTimeSecs())); - topology.set_num_workers(num_workers); - topology.set_num_tasks(num_tasks); - topology.set_error_info(errorString); + topology.set_uptimeSecs(TimeUtils.time_delta(base.getLanchTimeSecs())); + topology.set_numWorkers(num_workers); + topology.set_numTasks(num_tasks); + topology.set_errorInfo(errorString); topologySummaries.add(topology); @@ -584,34 +503,26 @@ public class NimbusUtils { return topologySummaries; } - public static SupervisorSummary mkSupervisorSummary( - SupervisorInfo supervisorInfo, String supervisorId, - Map<String, Integer> supervisorToUsedSlotNum) { + public static SupervisorSummary mkSupervisorSummary(SupervisorInfo supervisorInfo, String supervisorId, Map<String, Integer> supervisorToUsedSlotNum) { Integer usedNum = supervisorToUsedSlotNum.get(supervisorId); SupervisorSummary summary = - new SupervisorSummary(supervisorInfo.getHostName(), - supervisorId, supervisorInfo.getUptimeSecs(), - supervisorInfo.getWorkerPorts().size(), + new SupervisorSummary(supervisorInfo.getHostName(), supervisorId, supervisorInfo.getUptimeSecs(), supervisorInfo.getWorkerPorts().size(), usedNum == null ? 0 : usedNum); return summary; } - public static List<SupervisorSummary> mkSupervisorSummaries( - Map<String, SupervisorInfo> supervisorInfos, - Map<String, Assignment> assignments) { + public static List<SupervisorSummary> mkSupervisorSummaries(Map<String, SupervisorInfo> supervisorInfos, Map<String, Assignment> assignments) { - Map<String, Integer> supervisorToLeftSlotNum = - new HashMap<String, Integer>(); + Map<String, Integer> supervisorToLeftSlotNum = new HashMap<String, Integer>(); for (Entry<String, Assignment> entry : assignments.entrySet()) { Set<ResourceWorkerSlot> workers = entry.getValue().getWorkers(); for (ResourceWorkerSlot worker : workers) { String supervisorId = worker.getNodeId(); - SupervisorInfo supervisorInfo = - supervisorInfos.get(supervisorId); + SupervisorInfo supervisorInfo = supervisorInfos.get(supervisorId); if (supervisorInfo == null) { continue; } @@ -629,9 +540,7 @@ public class NimbusUtils { String supervisorId = entry.getKey(); SupervisorInfo supervisorInfo = entry.getValue(); - SupervisorSummary summary = - mkSupervisorSummary(supervisorInfo, supervisorId, - supervisorToLeftSlotNum); + SupervisorSummary summary = mkSupervisorSummary(supervisorInfo, supervisorId, supervisorToLeftSlotNum); ret.add(summary); } @@ -648,27 +557,24 @@ public class NimbusUtils { return ret; } - public static NimbusSummary getNimbusSummary( - StormClusterState stormClusterState, - List<SupervisorSummary> supervisorSummaries, NimbusData data) + public static NimbusSummary getNimbusSummary(StormClusterState stormClusterState, List<SupervisorSummary> supervisorSummaries, NimbusData data) throws Exception { NimbusSummary ret = new NimbusSummary(); String master = stormClusterState.get_leader_host(); NimbusStat nimbusMaster = new NimbusStat(); nimbusMaster.set_host(master); - nimbusMaster.set_uptime_secs(String.valueOf(data.uptime())); - ret.set_nimbus_master(nimbusMaster); + nimbusMaster.set_uptimeSecs(String.valueOf(data.uptime())); + ret.set_nimbusMaster(nimbusMaster); List<NimbusStat> nimbusSlaveList = new ArrayList<NimbusStat>(); - ret.set_nimbus_slaves(nimbusSlaveList); - Map<String, String> nimbusSlaveMap = - Cluster.get_all_nimbus_slave(stormClusterState); + ret.set_nimbusSlaves(nimbusSlaveList); + Map<String, String> nimbusSlaveMap = Cluster.get_all_nimbus_slave(stormClusterState); if (nimbusSlaveMap != null) { for (Entry<String, String> entry : nimbusSlaveMap.entrySet()) { NimbusStat slave = new NimbusStat(); slave.set_host(entry.getKey()); - slave.set_uptime_secs(entry.getValue()); + slave.set_uptimeSecs(entry.getValue()); nimbusSlaveList.add(slave); } @@ -678,46 +584,75 @@ public class NimbusUtils { int usedPort = 0; for (SupervisorSummary supervisor : supervisorSummaries) { - totalPort += supervisor.get_num_workers(); - usedPort += supervisor.get_num_used_workers(); + totalPort += supervisor.get_numWorkers(); + usedPort += supervisor.get_numUsedWorkers(); } - ret.set_supervisor_num(supervisorSummaries.size()); - ret.set_total_port_num(totalPort); - ret.set_used_port_num(usedPort); - ret.set_free_port_num(totalPort - usedPort); + ret.set_supervisorNum(supervisorSummaries.size()); + ret.set_totalPortNum(totalPort); + ret.set_usedPortNum(usedPort); + ret.set_freePortNum(totalPort - usedPort); ret.set_version(Utils.getVersion()); return ret; } - public static void updateTopologyTaskTimeout(NimbusData data, - String topologyId) { + public static void updateTopologyTaskTimeout(NimbusData data, String topologyId) { Map topologyConf = null; try { - topologyConf = - StormConfig.read_nimbus_topology_conf(data.getConf(), - topologyId); + topologyConf = StormConfig.read_nimbus_topology_conf(data.getConf(), topologyId); } catch (IOException e) { - LOG.warn("Failed to read configuration of " + topologyId + ", " - + e.getMessage()); + LOG.warn("Failed to read configuration of " + topologyId + ", " + e.getMessage()); } - Integer timeout = - JStormUtils.parseInt(topologyConf - .get(Config.NIMBUS_TASK_TIMEOUT_SECS)); + Integer timeout = JStormUtils.parseInt(topologyConf.get(Config.NIMBUS_TASK_TIMEOUT_SECS)); if (timeout == null) { - timeout = - JStormUtils.parseInt(data.getConf().get( - Config.NIMBUS_TASK_TIMEOUT_SECS)); + timeout = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_TIMEOUT_SECS)); } LOG.info("Setting taskTimeout:" + timeout + " for " + topologyId); data.getTopologyTaskTimeout().put(topologyId, timeout); } - public static void removeTopologyTaskTimeout(NimbusData data, - String topologyId) { + public static void removeTopologyTaskTimeout(NimbusData data, String topologyId) { data.getTopologyTaskTimeout().remove(topologyId); } + + public static void updateTopologyTaskHb(NimbusData data, String topologyId) { + StormClusterState clusterState = data.getStormClusterState(); + TopologyTaskHbInfo topologyTaskHb = null; + + try { + topologyTaskHb = clusterState.topology_heartbeat(topologyId); + } catch (Exception e) { + LOG.error("updateTopologyTaskHb: Failed to get topology task heartbeat info", e); + } + + if (topologyTaskHb != null) { + data.getTasksHeartbeat().put(topologyId, topologyTaskHb); + } + } + + public static void removeTopologyTaskHb(NimbusData data, String topologyId, int taskId) { + TopologyTaskHbInfo topologyTaskHbs = data.getTasksHeartbeat().get(topologyId); + + if (topologyTaskHbs != null) { + Map<Integer, TaskHeartbeat> taskHbs = topologyTaskHbs.get_taskHbs(); + if (taskHbs != null) { + taskHbs.remove(taskId); + } + } + } + + public static int getTopologyMasterId(Map<Integer, TaskInfo> tasksInfo) { + int ret = 0; + for (Entry<Integer, TaskInfo> entry : tasksInfo.entrySet()) { + if (entry.getValue().getComponentId().equalsIgnoreCase(Common.TOPOLOGY_MASTER_COMPONENT_ID)) { + ret = entry.getKey(); + break; + } + } + + return ret; + } }
