http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java index b8eeb20..7af67c2 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p/> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,711 +17,895 @@ */ package com.alibaba.jstorm.daemon.nimbus; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.cache.JStormCache; +import backtype.storm.generated.MetricInfo; +import backtype.storm.generated.MetricSnapshot; +import backtype.storm.generated.TopologyMetric; +import backtype.storm.utils.Utils; +import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.callback.RunnableCallback; import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.Cluster; -import com.alibaba.jstorm.cluster.Common; import com.alibaba.jstorm.cluster.StormClusterState; -import com.alibaba.jstorm.cluster.StormConfig; -import com.alibaba.jstorm.common.metric.Histogram; -import com.alibaba.jstorm.metric.AlimonitorClient; -import com.alibaba.jstorm.metric.MetricDef; -import com.alibaba.jstorm.metric.MetricSendClient; -import com.alibaba.jstorm.metric.MetricThrift; -import com.alibaba.jstorm.metric.SimpleJStormMetric; +import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.common.metric.MetricMeta; +import com.alibaba.jstorm.daemon.nimbus.metric.uploader.DefaultMetricUploader; +import com.alibaba.jstorm.daemon.nimbus.metric.uploader.MetricUploader; +import com.alibaba.jstorm.metric.*; import com.alibaba.jstorm.schedule.Assignment; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; -import com.alibaba.jstorm.utils.TimeCacheMap; +import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeUtils; import com.codahale.metrics.Gauge; +import com.google.common.collect.Sets; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import backtype.storm.generated.MetricInfo; -import backtype.storm.generated.MetricWindow; -import backtype.storm.generated.TopologyMetric; -import backtype.storm.generated.WorkerUploadMetrics; +import java.io.Serializable; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerArray; -public class TopologyMetricsRunnable extends RunnableCallback { +/** + * Topology metrics thread which resides in nimbus. + * This class is responsible for generating metrics IDs and uploading metrics to the underlying storage system. + * + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class TopologyMetricsRunnable extends Thread { private static final Logger LOG = LoggerFactory.getLogger(TopologyMetricsRunnable.class); - private static final String DEAD_SUPERVISOR_HEAD = "DeadSupervisor-"; - - public static interface Event { - - } - - public static class Update implements Event { - public WorkerUploadMetrics workerMetrics; - } - - public static class Remove implements Event { - public String topologyId; - } - - public static class Upload implements Event { - public long timeStamp; - } - - public static final String CACHE_NAMESPACE_METRIC = "cache_namespace_metric"; - public static final String CACHE_NAMESPACE_NETTY = "cache_namespace_netty"; - protected NimbusCache nimbusCache; - protected JStormCache dbCache; - + + protected JStormMetricCache metricCache; + /** - * cache all worker metrics will waste a little memory - * + * map<topologyId, map<worker, metricInfo>>, local memory cache, keeps only one snapshot of metrics. */ - protected Map<String, Set<String>> topologyWorkers; - protected TimeCacheMap<String, Long> removing; - - protected BlockingDeque<TopologyMetricsRunnable.Event> queue; + protected final ConcurrentMap<String, TopologyMetricContext> topologyMetricContexts = + new ConcurrentHashMap<>(); + + protected final BlockingDeque<TopologyMetricsRunnable.Event> queue = new LinkedBlockingDeque<>(); + + private static final String PENDING_UPLOAD_METRIC_DATA = "__pending.upload.metrics__"; + private static final String PENDING_UPLOAD_METRIC_DATA_INFO = "__pending.upload.metrics.info__"; + + // the slot is empty + private static final int UNSET = 0; + // the slot is ready for uploading + private static final int SET = 1; + // the slot is being uploaded + private static final int UPLOADING = 2; + // the slot will be set ready for uploading + private static final int PRE_SET = 3; + + protected final AtomicIntegerArray metricStat; + protected StormClusterState stormClusterState; - - protected MetricSendClient metricSendClient; - protected TopologyMetric emptyTopologyMetric = mkTopologyMetric(); - protected TreeMap<String, MetricInfo> emptyNettyMetric = new TreeMap<String, MetricInfo>(); + + protected MetricUploader metricUploader; + protected AtomicBoolean isShutdown; - protected boolean localMode; - protected TopologyNettyMgr topologyNettyMgr; - - protected Histogram updateHistogram; - protected AtomicBoolean isUploading = new AtomicBoolean(false); - protected Histogram uploadHistogram; - - public TopologyMetricsRunnable(NimbusData nimbusData) { - - this.nimbusCache = nimbusData.getNimbusCache(); - this.dbCache = nimbusCache.getDbCache(); - this.topologyWorkers = new ConcurrentHashMap<String, Set<String>>(); - this.removing = new TimeCacheMap<String, Long>(600); - this.queue = new LinkedBlockingDeque<TopologyMetricsRunnable.Event>(); + protected String clusterName; + protected int maxPendingUploadMetrics; + + private final boolean localMode; + private final NimbusData nimbusData; + private MetricQueryClient metricQueryClient; + + private ScheduledExecutorService clusterMetricsUpdateExecutor; + + /** + * refreshes alive topologies every min or on startup. + */ + protected AsyncLoopThread refreshTopologiesThread; + + /** + * the thread for metric sending, checks every second. + */ + private final Thread uploadThread = new MetricsUploadThread(); + + /** + * async flush metric meta + */ + private final Thread flushMetricMetaThread = new FlushMetricMetaThread(); + + /** + * use default UUID generator + */ + private final MetricIDGenerator metricIDGenerator = new DefaultMetricIDGenerator(); + + public TopologyMetricsRunnable(final NimbusData nimbusData) { + setName(getClass().getSimpleName()); + + this.nimbusData = nimbusData; + + this.localMode = nimbusData.isLocalMode(); + if (localMode) { + this.metricStat = new AtomicIntegerArray(1); + return; + } + + LOG.info("create topology metrics runnable."); + this.metricCache = nimbusData.getMetricCache(); this.stormClusterState = nimbusData.getStormClusterState(); this.isShutdown = nimbusData.getIsShutdown(); - this.topologyNettyMgr = nimbusData.getTopologyNettyMgr(); - - if (ConfigExtension.isAlimonitorMetricsPost(nimbusData.getConf())) { - metricSendClient = new AlimonitorClient(AlimonitorClient.DEFAUT_ADDR, AlimonitorClient.DEFAULT_PORT, true); - } else { - metricSendClient = new MetricSendClient(); - } - localMode = StormConfig.local_mode(nimbusData.getConf()); - - updateHistogram = SimpleJStormMetric.registerHistorgram("TopologyMetricsRunnable_Update"); - uploadHistogram = SimpleJStormMetric.registerHistorgram("TopologyMetricsRunnable_Upload"); - - SimpleJStormMetric.registerWorkerGauge(new Gauge<Double>() { - + + clusterName = ConfigExtension.getClusterName(nimbusData.getConf()); + if (clusterName == null) { + throw new RuntimeException("cluster.name property must be set in storm.yaml!"); + } + + this.maxPendingUploadMetrics = ConfigExtension.getMaxPendingMetricNum(nimbusData.getConf()); + this.metricStat = new AtomicIntegerArray(this.maxPendingUploadMetrics); + + int cnt = 0; + for (int i = 0; i < maxPendingUploadMetrics; i++) { + TopologyMetricDataInfo obj = getMetricDataInfoFromCache(i); + if (obj != null) { + this.metricStat.set(i, SET); + cnt++; + } + } + LOG.info("pending upload metrics: {}", cnt); + + // init alive topologies from zk + this.refreshTopologies(); + this.refreshTopologiesThread = new AsyncLoopThread(new RefreshTopologiesThread()); + + this.clusterMetricsUpdateExecutor = Executors.newSingleThreadScheduledExecutor(); + this.clusterMetricsUpdateExecutor.scheduleAtFixedRate(new Runnable() { @Override - public Double getValue() { - // TODO Auto-generated method stub - return (double) queue.size(); + public void run() { + int secOffset = TimeUtils.secOffset(); + int offset = 55; + if (secOffset < offset) { + JStormUtils.sleepMs((offset - secOffset) * 1000); + } else if (secOffset == offset) { + // do nothing + } else { + JStormUtils.sleepMs((60 - secOffset + offset) * 1000); + } + + LOG.info("cluster metrics force upload."); + mergeAndUploadClusterMetrics(); } - }, "TopologyMetricsRunnable_Queue"); + }, 5, 60, TimeUnit.SECONDS); + + // track nimbus JVM heap + JStormMetrics.registerWorkerGauge(JStormMetrics.NIMBUS_METRIC_KEY, MetricDef.MEMORY_USED, + new AsmGauge(new Gauge<Double>() { + @Override + public Double getValue() { + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage(); + return (double) memoryUsage.getUsed(); + } + })); } - - public void pushEvent(TopologyMetricsRunnable.Event cmd) { - queue.offer(cmd); + + /** + * init metric uploader + */ + public void init() { + String metricUploadClass = ConfigExtension.getMetricUploaderClass(nimbusData.getConf()); + if (StringUtils.isBlank(metricUploadClass)) { + metricUploadClass = DefaultMetricUploader.class.getName(); + } + // init metric uploader + LOG.info("metric uploader class:{}", metricUploadClass); + Object instance = Utils.newInstance(metricUploadClass); + if (!(instance instanceof MetricUploader)) { + throw new RuntimeException(metricUploadClass + " isn't MetricUploader class "); + } + this.metricUploader = (MetricUploader) instance; + try { + metricUploader.init(nimbusData); + } catch (Exception e) { + throw new RuntimeException(e); + } + LOG.info("Successfully init {}", metricUploadClass); + + // init metric query client + String metricQueryClientClass = ConfigExtension.getMetricQueryClientClass(nimbusData.getConf()); + if (!StringUtils.isBlank(metricQueryClientClass)) { + LOG.info("metric query client class:{}", metricQueryClientClass); + this.metricQueryClient = (MetricQueryClient) Utils.newInstance(metricQueryClientClass); + } else { + LOG.warn("use default metric query client class."); + this.metricQueryClient = new DefaultMetricQueryClient(); + } + try { + metricQueryClient.init(nimbusData.getConf()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.uploadThread.start(); + this.flushMetricMetaThread.start(); + + LOG.info("init topology metric runnable done."); + } + + public void shutdown() { + LOG.info("Begin to shutdown"); + metricUploader.cleanup(); + + LOG.info("Successfully shutdown"); } - - public TopologyMetric mkTopologyMetric() { - TopologyMetric emptyTopologyMetric = new TopologyMetric(); - - MetricInfo topologyMetricInfo = MetricThrift.mkMetricInfo(); - emptyTopologyMetric.set_topologyMetric(topologyMetricInfo); - - emptyTopologyMetric.set_componentMetric(new HashMap<String, MetricInfo>()); - emptyTopologyMetric.set_workerMetric(new HashMap<String, MetricInfo>()); - emptyTopologyMetric.set_taskMetric(new HashMap<Integer, MetricInfo>()); - return emptyTopologyMetric; - } - + @Override public void run() { - try { - TopologyMetricsRunnable.Event event = queue.take(); - - if (event instanceof Remove) { - - handleRemoveEvent((Remove) event); - return; - } else if (event instanceof Update) { - handleUpdateEvent((Update) event); - return; - } else if (event instanceof Upload) { - handleUploadEvent((Upload) event); - return; - } else { - LOG.error("Unknow event type"); + while (!isShutdown.get()) { + if (localMode) { return; } - - } catch (Exception e) { - if (isShutdown.get() == false) { - LOG.error(e.getMessage(), e); + + try { + // wait for metricUploader to be ready, for some external plugin like database, it'll take a few seconds + if (this.metricUploader != null) { + Event event = queue.poll(); + if (event == null) { + continue; + } + + if (event instanceof Remove) { + handleRemoveEvent((Remove) event); + } else if (event instanceof Update) { + handleUpdateEvent((Update) event); + } else if (event instanceof Refresh) { + handleRefreshEvent((Refresh) event); + } else if (event instanceof KillTopologyEvent) { + handleKillTopologyEvent((KillTopologyEvent) event); + } else if (event instanceof StartTopologyEvent) { + handleStartTopologyEvent((StartTopologyEvent) event); + } else if (event instanceof TaskDeadEvent) { + handleTaskDeadEvent((TaskDeadEvent) event); + } else if (event instanceof TaskStartEvent) { + handleTaskStartEvent((TaskStartEvent) event); + } else { + LOG.error("Unknown event type:{}", event.getClass()); + } + } + } catch (Exception e) { + if (!isShutdown.get()) { + LOG.error(e.getMessage(), e); + } } } } - - public void handleRemoveEvent(Remove event) { - String topologyId = event.topologyId; - TopologyMetric topologyMetric = (TopologyMetric) dbCache.get(getTopologyKey(topologyId)); - if (topologyMetric == null) { - LOG.warn("No TopologyMetric of " + topologyId); - return; + + + public boolean isTopologyAlive(String topologyId) { + return topologyMetricContexts.containsKey(topologyId); + } + + private int getAndPresetFirstEmptyIndex() { + for (int i = 0; i < maxPendingUploadMetrics; i++) { + if (metricStat.get(i) == UNSET) { + if (metricStat.compareAndSet(i, UNSET, PRE_SET)) { + return i; + } + } } - - removing.put(topologyId, System.currentTimeMillis()); - dbCache.remove(getTopologyKey(topologyId)); - dbCache.remove(getNettyTopologyKey(topologyId)); - topologyNettyMgr.rmTopology(topologyId); - LOG.info("Successfully remove TopologyMetric of " + topologyId); - return; - - } - - public void cleanDeadSupervisorWorker(TopologyMetric metric) { - List<String> removeList = new ArrayList<String>(); - - Map<String, MetricInfo> workerMetric = metric.get_workerMetric(); - if (workerMetric == null) { - return; + return -1; + } + + private int getFirstPendingUploadIndex() { + for (int i = 0; i < maxPendingUploadMetrics; i++) { + if (metricStat.get(i) == SET) { + return i; + } } - for (String hostPort : workerMetric.keySet()) { - if (hostPort.startsWith(DEAD_SUPERVISOR_HEAD)) { - removeList.add(hostPort); + return -1; + } + + public void markUploaded(int idx) { + this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA + idx); + this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA_INFO + idx); + this.metricStat.set(idx, UNSET); + } + + public void markUploading(int idx) { + this.metricStat.set(idx, UPLOADING); + } + + public void markSet(int idx) { + this.metricStat.set(idx, SET); + } + + public TopologyMetric getMetricDataFromCache(int idx) { + return (TopologyMetric) metricCache.get(PENDING_UPLOAD_METRIC_DATA + idx); + } + + public TopologyMetricDataInfo getMetricDataInfoFromCache(int idx) { + return (TopologyMetricDataInfo) metricCache.get(PENDING_UPLOAD_METRIC_DATA_INFO + idx); + } + + public void pushEvent(Event cmd) { + queue.offer(cmd); + } + + public Map<String, Long> registerMetrics(String topologyId, Set<String> metricNames) { + TimeTicker ticker = new TimeTicker(TimeUnit.MILLISECONDS, true); + + ConcurrentMap<String, Long> memMeta = topologyMetricContexts.get(topologyId).getMemMeta(); + Map<String, Long> ret = new HashMap<>(); + for (String metricName : metricNames) { + Long id = memMeta.get(metricName); + if (id != null && MetricUtils.isValidId(id)) { + ret.put(metricName, id); + } else { + id = metricIDGenerator.genMetricId(metricName); + Long old = memMeta.putIfAbsent(metricName, id); + if (old == null) { + ret.put(metricName, id); + } else { + ret.put(metricName, old); + } } } - - for (String removed : removeList) { - workerMetric.remove(removed); + + long cost = ticker.stop(); + LOG.info("register metrics, topology:{}, size:{}, cost:{}", topologyId, metricNames.size(), cost); + + return ret; + } + + public void handleRemoveEvent(Remove event) { + String topologyId = event.topologyId; + if (topologyId != null) { + removeTopology(topologyId); } + LOG.info("remove topology:{}.", topologyId); + } - - public void cleanTopology() { - Map<String, Long> removingMap = removing.buildMap(); - - Map<String, Assignment> assignMap = null; + + private void removeTopology(String topologyId) { + metricCache.removeTopology(topologyId); + metricCache.removeSampleRate(topologyId); + + topologyMetricContexts.remove(topologyId); + } + + + public void refreshTopologies() { + if (!topologyMetricContexts.containsKey(JStormMetrics.NIMBUS_METRIC_KEY)) { + LOG.info("adding __nimbus__ to metric context."); + Set<ResourceWorkerSlot> workerSlot = Sets.newHashSet(new ResourceWorkerSlot()); + TopologyMetricContext metricContext = new TopologyMetricContext(workerSlot); + topologyMetricContexts.putIfAbsent(JStormMetrics.NIMBUS_METRIC_KEY, metricContext); + syncMetaFromCache(JStormMetrics.NIMBUS_METRIC_KEY, topologyMetricContexts.get(JStormMetrics.NIMBUS_METRIC_KEY)); + } + if (!topologyMetricContexts.containsKey(JStormMetrics.CLUSTER_METRIC_KEY)) { + LOG.info("adding __cluster__ to metric context."); + Set<ResourceWorkerSlot> workerSlot = Sets.newHashSet(new ResourceWorkerSlot()); + Map conf = new HashMap(); + //there's no need to consider sample rate when cluster metrics merge + conf.put(ConfigExtension.TOPOLOGY_METRIC_SAMPLE_RATE, 1.0); + TopologyMetricContext metricContext = new TopologyMetricContext( + JStormMetrics.CLUSTER_METRIC_KEY, workerSlot, conf); + topologyMetricContexts.putIfAbsent(JStormMetrics.CLUSTER_METRIC_KEY, metricContext); + syncMetaFromCache(JStormMetrics.CLUSTER_METRIC_KEY, topologyMetricContexts.get(JStormMetrics.CLUSTER_METRIC_KEY)); + } + + Map<String, Assignment> assignMap; try { assignMap = Cluster.get_all_assignment(stormClusterState, null); + for (String topologyId : assignMap.keySet()) { + if (!topologyMetricContexts.containsKey(topologyId)) { + Assignment assignment = assignMap.get(topologyId); + TopologyMetricContext metricContext = + new TopologyMetricContext(assignment.getWorkers()); + metricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment)); + syncMetaFromCache(topologyId, metricContext); + + LOG.info("adding {} to metric context.", topologyId); + topologyMetricContexts.put(topologyId, metricContext); + } + } } catch (Exception e1) { - // TODO Auto-generated catch block - LOG.info("Failed to get Assignments"); + LOG.warn("failed to get assignments"); + return; } - - for (String topologyId : topologyWorkers.keySet()) { - if (assignMap.containsKey(topologyId) == false) { - removingMap.put(topologyId, System.currentTimeMillis()); + + List<String> removing = new ArrayList<>(); + for (String topologyId : topologyMetricContexts.keySet()) { + if (!JStormMetrics.NIMBUS_METRIC_KEY.equals(topologyId) + && !JStormMetrics.CLUSTER_METRIC_KEY.equals(topologyId) + && !assignMap.containsKey(topologyId)) { + removing.add(topologyId); } } - - for (String topologyId : removingMap.keySet()) { - dbCache.remove(getTopologyKey(topologyId)); - - Set<String> workers = topologyWorkers.get(topologyId); - if (workers != null) { - for (String workerSlot : workers) { - dbCache.remove(getWorkerKey(topologyId, workerSlot)); + + for (String topologyId : removing) { + LOG.info("removing topology:{}", topologyId); + removeTopology(topologyId); + } + } + + /** + * sync topology metric meta from external storage like TDDL/OTS. + * nimbus server will skip syncing, only followers do this + */ + public void syncTopologyMeta() { + String nimbus = JStormMetrics.NIMBUS_METRIC_KEY; + if (topologyMetricContexts.containsKey(nimbus)) { + syncMetaFromRemote(nimbus, topologyMetricContexts.get(nimbus)); + } + String cluster = JStormMetrics.CLUSTER_METRIC_KEY; + if (topologyMetricContexts.containsKey(cluster)) { + syncMetaFromRemote(cluster, topologyMetricContexts.get(cluster)); + } + + Map<String, Assignment> assignMap; + try { + assignMap = Cluster.get_all_assignment(stormClusterState, null); + for (String topologyId : assignMap.keySet()) { + if (topologyMetricContexts.containsKey(topologyId)) { + Assignment assignment = assignMap.get(topologyId); + TopologyMetricContext metricContext = + new TopologyMetricContext(assignment.getWorkers()); + metricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment)); + + syncMetaFromCache(topologyId, metricContext); + syncMetaFromRemote(topologyId, metricContext); } - topologyWorkers.remove(topologyId); - } - - } - - for (Entry<String, Set<String>> entry : topologyWorkers.entrySet()) { - String topologyId = entry.getKey(); - Set<String> metricWorkers = entry.getValue(); - - Set<String> workerSlots = new HashSet<String>(); - - Assignment assignment = assignMap.get(topologyId); - if (assignment == null) { - LOG.error("Assignment disappear of " + topologyId); - continue; } - - for (ResourceWorkerSlot worker : assignment.getWorkers()) { - String slot = getWorkerSlotName(worker.getNodeId(), worker.getPort()); - workerSlots.add(slot); + } catch (Exception e1) { + LOG.warn("failed to get assignments"); + } + } + + /** + * sync metric meta from rocks db into mem cache on startup + */ + private void syncMetaFromCache(String topologyId, TopologyMetricContext context) { + if (!context.syncMeta()) { + Map<String, Long> meta = metricCache.getMeta(topologyId); + if (meta != null) { + context.getMemMeta().putAll(meta); } - - Set<String> removes = new HashSet<String>(); - for (String slot : metricWorkers) { - if (workerSlots.contains(slot) == false) { - LOG.info("Remove worker metrics of {}:{}", topologyId, slot); - removes.add(slot); + context.setSyncMeta(true); + } + } + + private void syncMetaFromRemote(String topologyId, TopologyMetricContext context) { + try { + int memSize = context.getMemMeta().size(); + int zkSize = (Integer) stormClusterState.get_topology_metric(topologyId); + + if (memSize != zkSize) { + ConcurrentMap<String, Long> memMeta = context.getMemMeta(); + for (MetaType metaType : MetaType.values()) { + List<MetricMeta> metaList = metricQueryClient.getMetricMeta(clusterName, topologyId, metaType); + if (metaList != null) { + LOG.info("get remote metric meta, topology:{}, metaType:{}, mem:{}, zk:{}, new size:{}", + topologyId, metaType, memSize, zkSize, metaList.size()); + for (MetricMeta meta : metaList) { + memMeta.putIfAbsent(meta.getFQN(), meta.getId()); + } + } } + metricCache.putMeta(topologyId, memMeta); } - - for (String slot : removes) { - metricWorkers.remove(slot); - dbCache.remove(getWorkerKey(topologyId, slot)); - } + } catch (Exception ex) { + LOG.error("failed to sync remote meta", ex); } } - + /** - * Upload metric to ZK - * - * @param event + * send topology track to jstorm monitor */ - public void handleUploadEvent(Upload event) { - if (isUploading.getAndSet(true) == true) { - LOG.info("Nimbus is alread uploading"); - return ; - } - - long start = System.currentTimeMillis(); - - cleanTopology(); - - render(); - - isUploading.set(false); - - long end = System.currentTimeMillis(); - uploadHistogram.update(end - start); - - - } - - public String getWorkerHostname(WorkerUploadMetrics workerMetrics) { - - String hostname = null; - String supervisorId = workerMetrics.get_supervisor_id(); - try { - hostname = Cluster.get_supervisor_hostname(stormClusterState, supervisorId); - } catch (Exception e) { - // TODO Auto-generated catch block - LOG.warn("Failed to get hostname of " + supervisorId); + protected void handleKillTopologyEvent(KillTopologyEvent event) { + metricUploader.sendEvent(this.clusterName, event); + removeTopology(event.topologyId); + } + + private void handleStartTopologyEvent(StartTopologyEvent event) { + this.metricCache.putSampleRate(event.topologyId, event.sampleRate); + metricUploader.sendEvent(this.clusterName, event); + if (!topologyMetricContexts.containsKey(event.topologyId)) { + TopologyMetricContext metricContext = new TopologyMetricContext(); + // note that workerNum is not set here. + this.topologyMetricContexts.put(event.topologyId, metricContext); } - if (hostname == null) { - hostname = DEAD_SUPERVISOR_HEAD + supervisorId; + } + + private void handleTaskDeadEvent(TaskDeadEvent event) { + metricUploader.sendEvent(this.clusterName, event); + + // unregister dead workers + Set<ResourceWorkerSlot> workers = new HashSet<>(); + workers.addAll(event.deadTasks.values()); + for (ResourceWorkerSlot worker : workers) { + metricCache.unregisterWorker(event.topologyId, worker.getHostname(), worker.getPort()); } - - return hostname; } - - public void avgMetricWindow(MetricWindow metric, int parallel) { - if (parallel == 0) { - return; + + private void handleTaskStartEvent(final TaskStartEvent event) { + Assignment assignment = event.newAssignment; + TopologyMetricContext metricContext = topologyMetricContexts.get(event.topologyId); + if (metricContext != null) { + metricContext.setWorkerSet(assignment.getWorkers()); + } else { + metricContext = new TopologyMetricContext(); + metricContext.setWorkerSet(assignment.getWorkers()); + topologyMetricContexts.put(event.topologyId, metricContext); } - Map<Integer, Double> map = metric.get_metricWindow(); - Map<Integer, Double> newMap = new HashMap<Integer, Double>(); - if (map != null) { - for (Entry<Integer, Double> entry : map.entrySet()) { - newMap.put(entry.getKey(), entry.getValue() / parallel); + metricUploader.sendEvent(this.clusterName, event); + } + + /** + * merge and send all metric data. + */ + public void handleRefreshEvent(Refresh dummy) { + TimeTicker ticker = new TimeTicker(TimeUnit.MILLISECONDS, true); + try { + refreshTopologies(); + LOG.info("refresh topologies, cost:{}", ticker.stopAndRestart()); + if (!nimbusData.isLeader()) { + syncTopologyMeta(); + LOG.info("sync topology meta, cost:{}", ticker.stop()); } + } catch (Exception ex) { + LOG.error("handleRefreshEvent error:", ex); } - - metric.set_metricWindow(newMap); - } - - public MetricInfo mergeMetricInfo(MetricInfo from, MetricInfo to, Set<String> tags) { - if (to == null) { - to = MetricThrift.mkMetricInfo(); - } - - if (from.get_baseMetric() == null) { - LOG.warn("No base Metric "); - return to; - } - - for (String tag : tags) { - - MetricWindow fromMetric = from.get_baseMetric().get(tag); - Map<String, MetricWindow> toMetricMap = to.get_baseMetric(); - if (toMetricMap == null) { - toMetricMap = new HashMap<String, MetricWindow>(); - to.set_baseMetric(toMetricMap); - } - - MetricWindow toMetric = toMetricMap.get(tag); - - toMetric = MetricThrift.mergeMetricWindow(fromMetric, toMetric); - - toMetricMap.put(tag, toMetric); - - } - - return to; - } - - public Map<String, Map<String, MetricWindow>> mergeTaskStreams( - Map<String, Map<String, MetricWindow>> componentStreams, - Map<String, Map<String, MetricWindow>> taskStreams, - Map<String, Map<String, AtomicInteger>> componentStreamParallel) { - - if (taskStreams == null || taskStreams.size() == 0) { - return componentStreams; - } - - if (componentStreams == null) { - componentStreams = new HashMap<String, Map<String, MetricWindow>>(); - } - - for (Entry<String, Map<String, MetricWindow>> entry : taskStreams.entrySet()) { + } + + private TopologyMetricContext getClusterTopologyMetricContext() { + return topologyMetricContexts.get(JStormMetrics.CLUSTER_METRIC_KEY); + } + + private void mergeAndUploadClusterMetrics() { + TopologyMetricContext context = getClusterTopologyMetricContext(); + TopologyMetric tpMetric = context.mergeMetrics(); + if (tpMetric == null) { + tpMetric = MetricUtils.mkTopologyMetric(); + tpMetric.set_topologyMetric(MetricUtils.mkMetricInfo()); + } + + //reset snapshots metric id + MetricInfo clusterMetrics = tpMetric.get_topologyMetric(); + Map<String, Long> metricNames = context.getMemMeta(); + for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : clusterMetrics.get_metrics().entrySet()) { String metricName = entry.getKey(); - Map<String, MetricWindow> streamMetricWindows = entry.getValue(); - - if (streamMetricWindows == null) { - continue; - } - - Map<String, AtomicInteger> streamCounters = componentStreamParallel.get(metricName); - if (streamCounters == null) { - streamCounters = new HashMap<String, AtomicInteger>(); - componentStreamParallel.put(metricName, streamCounters); + MetricType metricType = MetricUtils.metricType(metricName); + Long metricId = metricNames.get(metricName); + for (Map.Entry<Integer, MetricSnapshot> metric : entry.getValue().entrySet()) { + MetricSnapshot snapshot = metric.getValue(); + snapshot.set_metricId(metricId); + if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) { + snapshot.set_points(new ArrayList<Long>(0)); + } +// entry.getValue().put(metric.getKey(), snapshot); } - - Map<String, MetricWindow> componentStreamMetricWindows = componentStreams.get(metricName); - if (componentStreamMetricWindows == null) { - componentStreamMetricWindows = new HashMap<String, MetricWindow>(); - componentStreams.put(metricName, componentStreamMetricWindows); + } + + //fill the unacquired metrics with zero + long ts = System.currentTimeMillis(); + for (Map.Entry<String, Long> entry : metricNames.entrySet()) { + String name = entry.getKey(); + if (!clusterMetrics.get_metrics().containsKey(name)) { + Map<Integer, MetricSnapshot> metric = new HashMap<>(); + MetricType type = MetricUtils.metricType(name); + metric.put(AsmWindow.M1_WINDOW, new MetricSnapshot(entry.getValue(), ts, type.getT())); + clusterMetrics.put_to_metrics(name, metric); } - - for (Entry<String, MetricWindow> streamEntry : streamMetricWindows.entrySet()) { - String streamName = streamEntry.getKey(); - MetricWindow taskMetricWindow = streamEntry.getValue(); - - MetricWindow componentMetricWindow = componentStreamMetricWindows.get(streamName); - - componentMetricWindow = MetricThrift.mergeMetricWindow(taskMetricWindow, componentMetricWindow); - - componentStreamMetricWindows.put(streamName, componentMetricWindow); - - AtomicInteger counter = streamCounters.get(streamName); - if (counter == null) { - counter = new AtomicInteger(0); - streamCounters.put(streamName, counter); + } + + //upload to cache + Update event = new Update(); + event.timestamp = System.currentTimeMillis(); + event.topologyMetrics = tpMetric; + event.topologyId = JStormMetrics.CLUSTER_METRIC_KEY; + pushEvent(event); + + LOG.info("send update event for cluster metrics, size : {}", clusterMetrics.get_metrics_size()); + } + + //update cluster metrics local cache + private void updateClusterMetrics(String topologyId, TopologyMetric tpMetric) { + if (tpMetric.get_topologyMetric().get_metrics_size() > 0) { + TopologyMetricContext context = getClusterTopologyMetricContext(); + MetricInfo topologyMetrics = tpMetric.get_topologyMetric(); + // make a new MetricInfo to save the topologyId's metric + MetricInfo clusterMetrics = MetricUtils.mkMetricInfo(); + Set<String> metricNames = new HashSet<>(); + for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : topologyMetrics.get_metrics().entrySet()) { + String metricName = MetricUtils.topo2clusterName(entry.getKey()); + MetricType metricType = MetricUtils.metricType(metricName); + Map<Integer, MetricSnapshot> winData = new HashMap<>(); + for (Map.Entry<Integer, MetricSnapshot> entryData : entry.getValue().entrySet()) { + MetricSnapshot snapshot = entryData.getValue().deepCopy(); + winData.put(entryData.getKey(), snapshot); + if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) { + // reset topology metric points + entryData.getValue().set_points(new ArrayList<Long>(0)); + } } - counter.incrementAndGet(); + clusterMetrics.put_to_metrics(metricName, winData); + metricNames.add(metricName); } + // save to local cache, waiting for merging + context.addToMemCache(topologyId, clusterMetrics); + registerMetrics(JStormMetrics.CLUSTER_METRIC_KEY, metricNames); } - - return componentStreams; } - - public void avgStreams(Map<String, Map<String, MetricWindow>> tagStreamsMetrics, Map<String, Map<String, AtomicInteger>> counters, String tag) { - if (tagStreamsMetrics == null) { - return; - } - - Map<String, MetricWindow> streamMetrics = tagStreamsMetrics.get(tag); - if (streamMetrics == null) { - return; + + /** + * put metric data to metric cache. + */ + public void handleUpdateEvent(Update event) { + TopologyMetric topologyMetrics = event.topologyMetrics; + final String topologyId = event.topologyId; + + if (this.topologyMetricContexts.containsKey(topologyId)) { + if (!JStormMetrics.CLUSTER_METRIC_KEY.equals(topologyId)) { + updateClusterMetrics(topologyId, topologyMetrics); + } + + // overwrite + metricCache.putMetricData(topologyId, topologyMetrics); + + // below process is kind of a transaction, first we lock an empty slot, mark it as PRE_SET + // by this time the slot is not yet ready for uploading as the upload thread looks for SET slots only + // after all metrics data has been saved, we mark it as SET, then it's ready for uploading. + int idx = getAndPresetFirstEmptyIndex(); + if (idx >= 0) { + TopologyMetricDataInfo summary = new TopologyMetricDataInfo(); + summary.topologyId = topologyId; + summary.timestamp = event.timestamp; + if (topologyId.equals(JStormMetrics.NIMBUS_METRIC_KEY) || + topologyId.equals(JStormMetrics.CLUSTER_METRIC_KEY)) { + summary.type = MetricUploader.METRIC_TYPE_TOPLOGY; + } else { + if (topologyMetrics.get_topologyMetric().get_metrics_size() > 0 || + topologyMetrics.get_componentMetric().get_metrics_size() > 0) { + if (topologyMetrics.get_taskMetric().get_metrics_size() + + topologyMetrics.get_workerMetric().get_metrics_size() + + topologyMetrics.get_nettyMetric().get_metrics_size() + + topologyMetrics.get_streamMetric().get_metrics_size() > 0) { + summary.type = MetricUploader.METRIC_TYPE_ALL; + } else { + summary.type = MetricUploader.METRIC_TYPE_TOPLOGY; + } + } else { + summary.type = MetricUploader.METRIC_TYPE_TASK; + } + } + + metricCache.put(PENDING_UPLOAD_METRIC_DATA_INFO + idx, summary); + metricCache.put(PENDING_UPLOAD_METRIC_DATA + idx, topologyMetrics); + markSet(idx); + LOG.info("put metric data to local cache, topology:{}, idx:{}", topologyId, idx); + } else { + LOG.error("exceeding maxPendingUploadMetrics, skip metrics data for topology:{}", topologyId); + } + } else { + LOG.warn("topology {} has been killed or has not started, skip update.", topologyId); } - - for (Entry<String, MetricWindow> entry : streamMetrics.entrySet()) { - String streamName = entry.getKey(); - MetricWindow metric = entry.getValue(); - - AtomicInteger counter = counters.get(tag).get(streamName); - if (counter == null) { - continue; - + } + + /** + * get topology metrics, note that only topology & component & worker metrics are returned + */ + public TopologyMetric getTopologyMetric(String topologyId) { + long start = System.nanoTime(); + try { + TopologyMetric ret = new TopologyMetric(); + List<MetricInfo> topologyMetrics = metricCache.getMetricData(topologyId, MetaType.TOPOLOGY); + List<MetricInfo> componentMetrics = metricCache.getMetricData(topologyId, MetaType.COMPONENT); + List<MetricInfo> workerMetrics = metricCache.getMetricData(topologyId, MetaType.WORKER); + + MetricInfo dummy = MetricUtils.mkMetricInfo(); + if (topologyMetrics.size() > 0) { + // get the last min topology metric + ret.set_topologyMetric(topologyMetrics.get(topologyMetrics.size() - 1)); + } else { + ret.set_topologyMetric(dummy); } - - avgMetricWindow(metric, counter.get()); - } - } - - public void mergeTasks(TopologyMetric topologyMetric, String topologyId) { - Map<Integer, MetricInfo> taskMetrics = topologyMetric.get_taskMetric(); - - Map<Integer, String> taskToComponent = null; - try { - taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null); - } catch (Exception e) { - // TODO Auto-generated catch block - LOG.error("Failed to get taskToComponent"); - return ; - } - if (taskToComponent == null) { - LOG.error("Failed to get taskToComponent"); - return ; - } - - Map<String, MetricInfo> componentMetrics = topologyMetric.get_componentMetric(); - if (componentMetrics == null) { - componentMetrics = new HashMap<String, MetricInfo>(); - topologyMetric.set_componentMetric(componentMetrics); - } - - Map<String, AtomicInteger> componentTaskParallel = new HashMap<String, AtomicInteger>(); - Map<String, Map<String, AtomicInteger>> componentStreamParallel = new HashMap<String, Map<String, AtomicInteger>>(); - - for (Entry<Integer, MetricInfo> entry : taskMetrics.entrySet()) { - Integer taskId = entry.getKey(); - MetricInfo taskMetric = entry.getValue(); - - String component = taskToComponent.get(taskId); - if (component == null) { - LOG.error("Failed to get component of task " + taskId); - continue; + if (componentMetrics.size() > 0) { + ret.set_componentMetric(componentMetrics.get(0)); + } else { + ret.set_componentMetric(dummy); } - - MetricInfo componentMetric = componentMetrics.get(component); - - componentMetric = mergeMetricInfo(taskMetric, componentMetric, MetricDef.MERGE_SUM_TAG); - componentMetric = mergeMetricInfo(taskMetric, componentMetric, MetricDef.MERGE_AVG_TAG); - - Map<String, Map<String, MetricWindow>> input = mergeTaskStreams(componentMetric.get_inputMetric(), taskMetric.get_inputMetric(), componentStreamParallel); - componentMetric.set_inputMetric(input); - - Map<String, Map<String, MetricWindow>> output = mergeTaskStreams(componentMetric.get_outputMetric(), taskMetric.get_outputMetric(), componentStreamParallel); - componentMetric.set_outputMetric(output); - - componentMetrics.put(component, componentMetric); - - AtomicInteger counter = componentTaskParallel.get(component); - if (counter == null) { - counter = new AtomicInteger(0); - componentTaskParallel.put(component, counter); + if (workerMetrics.size() > 0) { + ret.set_workerMetric(workerMetrics.get(0)); + } else { + ret.set_workerMetric(dummy); } - - counter.incrementAndGet(); - } - - for (Entry<String, MetricInfo> entry : componentMetrics.entrySet()) { - String componentName = entry.getKey(); - MetricInfo metricInfo = entry.getValue(); - - AtomicInteger counter = componentTaskParallel.get(componentName); - - for (String tag : MetricDef.MERGE_AVG_TAG) { - MetricWindow metricWindow = metricInfo.get_baseMetric().get(tag); - - avgMetricWindow(metricWindow, counter.get()); - - avgStreams(metricInfo.get_inputMetric(), componentStreamParallel, tag); - avgStreams(metricInfo.get_outputMetric(), componentStreamParallel, tag); + ret.set_taskMetric(dummy); + ret.set_streamMetric(dummy); + ret.set_nettyMetric(dummy); + + return ret; + } finally { + long end = System.nanoTime(); + SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", (end - start) / TimeUtils.NS_PER_US); + } + } + + public static String getWorkerSlotName(String hostname, Integer port) { + return hostname + ":" + port; + } + + class RefreshTopologiesThread extends RunnableCallback { + @Override + public void run() { + if (!isShutdown.get()) { + pushEvent(new Refresh()); } } + + @Override + public Object getResult() { + return TimeUtils.SEC_PER_MIN; + } + + @Override + public String getThreadName() { + return "RefreshThread"; + } } - - public void mergeComponent(TopologyMetric topologyMetric) { - MetricInfo topologyMetricInfo = MetricThrift.mkMetricInfo(); - topologyMetric.set_topologyMetric(topologyMetricInfo); - Map<String, MetricInfo> componentMetrics = topologyMetric.get_componentMetric(); - if (componentMetrics == null) { - return; + + class MetricsUploadThread extends Thread { + public MetricsUploadThread() { + setName("main-upload-thread"); } - - for (MetricInfo componentMetric : componentMetrics.values()) { - topologyMetricInfo = mergeMetricInfo(componentMetric, topologyMetricInfo, MetricDef.MERGE_SUM_TAG); - } - - topologyMetric.set_topologyMetric(topologyMetricInfo); - } - - public void mergeTopology(TopologyMetric topologyMetric, WorkerUploadMetrics workerMetrics) { - String topologyId = workerMetrics.get_topology_id(); - - Map<Integer, MetricInfo> taskMetrics = topologyMetric.get_taskMetric(); - if (taskMetrics == null) { - taskMetrics = new HashMap<Integer, MetricInfo>(); - topologyMetric.set_taskMetric(taskMetrics); - } - taskMetrics.putAll(workerMetrics.get_taskMetric()); - - String hostname = getWorkerHostname(workerMetrics); - topologyMetric.put_to_workerMetric(getWorkerSlotName(hostname, workerMetrics.get_port()), workerMetrics.get_workerMetric()); - - } - - public void mergeNetty(WorkerUploadMetrics workerMetric, String topologyId, Set<String> connections) { - - if (topologyNettyMgr.getTopology(topologyId) == false) { - return ; - } - Map<String, MetricInfo> connectionMetrics = workerMetric.get_nettyMetric().get_connections(); - for (Entry<String, MetricInfo> entry : connectionMetrics.entrySet()) { - String connectionName = entry.getKey(); - MetricInfo metric = entry.getValue(); - - MetricInfo cacheMetric = (MetricInfo)dbCache.get(getNettyConnectionKey(topologyId, connectionName)); - cacheMetric = MetricThrift.mergeMetricInfo(metric, cacheMetric); - - connections.add(connectionName); - - dbCache.put(getNettyConnectionKey(topologyId, connectionName), cacheMetric); - } - } - - public void mergeNetty(String topologyId, Set<String> connections) { - if (topologyNettyMgr.getTopology(topologyId) == false) { - LOG.info("Skip merge netty detail metrics"); - return ; - } - // @@@ - // this function will cost much memory when worker number is more than 200 - Map<String, MetricInfo> metricMap = new TreeMap<String, MetricInfo>(); - - for (String connection : connections) { - MetricInfo cacheMetric = (MetricInfo)dbCache.get(getNettyConnectionKey(topologyId, connection)); - if (cacheMetric == null) { - LOG.warn("Failed to get cacheMetric of {}:{}", topologyId, connection ); - continue; - } - - metricMap.put(connection, cacheMetric); - dbCache.remove(getNettyConnectionKey(topologyId, connection)); - } - - dbCache.put(getNettyTopologyKey(topologyId), metricMap); - // accelerate free memory - metricMap.clear(); - } - - public void render() { - for (Entry<String, Set<String>> entry : topologyWorkers.entrySet()) { - String topologyId = entry.getKey(); - Set<String> workers = entry.getValue(); - Set<String> connections = new TreeSet<String>(); - - TopologyMetric topologyMetric = new TopologyMetric(); - - boolean isExistWorker = false; - for (String workerId : workers) { - WorkerUploadMetrics workerMetric = (WorkerUploadMetrics) dbCache.get(getWorkerKey(topologyId, workerId)); - if (workerMetric == null) { - LOG.warn("Failed to get WorkerUploadMetrics of " + getWorkerKey(topologyId, workerId)); - continue; + + @Override + public void run() { + while (!isShutdown.get()) { + try { + if (metricUploader != null && nimbusData.isLeader()) { + final int idx = getFirstPendingUploadIndex(); + if (idx >= 0) { + markUploading(idx); + upload(clusterName, idx); + } + } + JStormUtils.sleepMs(5); + } catch (Exception ex) { + LOG.error("Error", ex); } - isExistWorker = true; - mergeTopology(topologyMetric, workerMetric); - - mergeNetty(workerMetric, topologyId, connections); } - if (isExistWorker == false) { - LOG.info("No worker metrics of {}", topologyId); - continue; + } + + public boolean upload(final String clusterName, final int idx) { + final TopologyMetricDataInfo summary = getMetricDataInfoFromCache(idx); + if (summary == null) { + LOG.warn("metric summary is null from cache idx:{}", idx); + markUploaded(idx); + return true; } - - mergeTasks(topologyMetric, topologyId); - - mergeComponent(topologyMetric); - - - dbCache.put(getTopologyKey(topologyId), topologyMetric); - - mergeNetty(topologyId, connections); - - LOG.info("Successfully render topologyId of " + topologyId); - - uploadToAlimonitor(topologyMetric, topologyId); - - cleanDeadSupervisorWorker(topologyMetric); - - - try { - - //LOG.info(topologyId + " metrics is :\n" + Utils.toPrettyJsonString(topologyMetric)); - LOG.info(topologyId + " finish metric"); - stormClusterState.set_topology_metric(topologyId, topologyMetric); - LOG.info("Successfully uploaded toplogy metrics: " + topologyId); - } catch (Exception e) { - // TODO Auto-generated catch block - LOG.info("Failed to upload toplogy metrics: " + topologyId, e); - continue; + + final String topologyId = summary.topologyId; + if (!isTopologyAlive(topologyId)) { + LOG.warn("topology {} is not alive, skip sending metrics.", topologyId); + markUploaded(idx); + return true; } - + + return metricUploader.upload(clusterName, topologyId, idx, summary.toMap()); } } - - public void handleUpdateEvent(Update event) { - long start = System.currentTimeMillis(); - - WorkerUploadMetrics workerMetrics = event.workerMetrics; - - String topologyId = workerMetrics.get_topology_id(); - if (removing.containsKey(topologyId) == true) { - LOG.info("Topology " + topologyId + " has been removed, skip update"); - return; + + class FlushMetricMetaThread extends Thread { + + public FlushMetricMetaThread() { + setName("FlushMetricMetaThread"); } - - Set<String> workers = topologyWorkers.get(topologyId); - if (workers == null) { - workers = new HashSet<String>(); - topologyWorkers.put(topologyId, workers); - } - - String workerSlot = getWorkerSlotName(workerMetrics.get_supervisor_id(), workerMetrics.get_port()); - - workers.add(workerSlot); - dbCache.put(getWorkerKey(topologyId, workerSlot), workerMetrics); - - long end = System.currentTimeMillis(); - - updateHistogram.update((end - start)); - } - - public void uploadToAlimonitor(TopologyMetric topologyMetric, String topologyId) { - // @@@ TODO - } - - - public TopologyMetric getTopologyMetric(String topologyId) { - long start = System.nanoTime(); - try { - TopologyMetric ret = (TopologyMetric) dbCache.get(getTopologyKey(topologyId)); - if (ret == null) { - return emptyTopologyMetric; - } else { - return ret; + + @Override + public void run() { + while (!isShutdown.get()) { + long start = System.currentTimeMillis(); + try { + // if metricUploader is not fully initialized, return directly + if (nimbusData.isLeader() && metricUploader != null) { + for (Map.Entry<String, TopologyMetricContext> entry : topologyMetricContexts.entrySet()) { + String topologyId = entry.getKey(); + TopologyMetricContext metricContext = entry.getValue(); + + Map<String, Long> cachedMeta = metricCache.getMeta(topologyId); + if (cachedMeta == null) { + cachedMeta = new HashMap<>(); + } + Map<String, Long> memMeta = metricContext.getMemMeta(); + if (memMeta.size() > cachedMeta.size()) { + cachedMeta.putAll(memMeta); + } + metricCache.putMeta(topologyId, cachedMeta); + + int curSize = cachedMeta.size(); + if (curSize != metricContext.getFlushedMetaNum()) { + metricContext.setFlushedMetaNum(curSize); + + metricUploader.registerMetrics(clusterName, topologyId, cachedMeta); + LOG.info("flush metric meta, topology:{}, total:{}, cost:{}.", + topologyId, curSize, System.currentTimeMillis() - start); + } + stormClusterState.set_topology_metric(topologyId, curSize); + } + } + + JStormUtils.sleepMs(15000); + } catch (Exception ex) { + LOG.error("Error", ex); + } } - }finally { - long end = System.nanoTime(); - - SimpleJStormMetric.updateHistorgram("getTopologyMetric", (end - start)/1000000.0d); } } - - public SortedMap<String, MetricInfo> getNettyMetric(String topologyId) { - TreeMap<String, MetricInfo> ret = (TreeMap<String, MetricInfo>)dbCache.get(getNettyTopologyKey(topologyId)); - if (ret == null) { - return emptyNettyMetric; - }else { + + public static class TopologyMetricDataInfo implements Serializable { + private static final long serialVersionUID = 1303262512351757610L; + + public String topologyId; + public String type; // "tp" for tp/comp metrics OR "task" for task/stream/worker/netty metrics + public long timestamp; // metrics report time + + public Map<String, Object> toMap() { + Map<String, Object> ret = new HashMap<String, Object>(); + ret.put(MetricUploader.METRIC_TIME, timestamp); + ret.put(MetricUploader.METRIC_TYPE, type); + return ret; } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } } - - public static String getWorkerSlotName(String hostname, Integer port) { - return hostname + ":" + port; + + // ============================================== + // =================== events =================== + // ============================================== + public static class Event { + protected Event() { + } + + public String clusterName; + public String topologyId; + public long timestamp; + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } + } + + public static class Update extends Event { + public TopologyMetric topologyMetrics; } - - public static String getWorkerKey(String topologyId, String workerSlot) { - return CACHE_NAMESPACE_METRIC + "@" + topologyId + "@" + workerSlot; + + public static class Remove extends Event { } - - public static String getTopologyKey(String topologyId) { - return CACHE_NAMESPACE_METRIC + "@" + topologyId; + + public static class Refresh extends Event { } - - public static String getNettyConnectionKey(String topologyId, String connection) { - return CACHE_NAMESPACE_NETTY + "@" + topologyId + "@" + connection; + + + public static class KillTopologyEvent extends Event { } - - public static String getNettyTopologyKey(String topologyId) { - return CACHE_NAMESPACE_NETTY + "@" + topologyId; + + public static class StartTopologyEvent extends Event { + public double sampleRate; + } + + public static class TaskDeadEvent extends Event { + public Map<Integer, ResourceWorkerSlot> deadTasks; + } + + public static class TaskStartEvent extends Event { + public Assignment oldAssignment; + public Assignment newAssignment; + public Map<Integer, String> task2Component; } - - }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java index 7eaccab..6e55049 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java @@ -1,105 +1,78 @@ package com.alibaba.jstorm.daemon.nimbus; -import java.util.Map; - +import backtype.storm.Config; +import backtype.storm.generated.InvalidTopologyException; +import com.alibaba.jstorm.cluster.Common; +import com.alibaba.jstorm.cluster.StormConfig; +import com.alibaba.jstorm.metric.MetricUtils; import org.jboss.netty.util.internal.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.cluster.Common; -import com.alibaba.jstorm.cluster.StormConfig; -import com.alibaba.jstorm.utils.JStormUtils; - -import backtype.storm.Config; -import backtype.storm.generated.InvalidTopologyException; +import java.util.Map; public class TopologyNettyMgr { - private static Logger LOG = LoggerFactory.getLogger(TopologyNettyMgr.class); - private boolean defaultEnable = true; - private Map nimbusConf; - private ConcurrentHashMap<String, Boolean> setting = new ConcurrentHashMap<String, Boolean>(); - private static final int WORKER_DISABLE_THREADHOLD = 200; - - public TopologyNettyMgr(Map conf) { - nimbusConf = conf; - - Boolean isEnable = ConfigExtension.isEnableTopologyNettyMetrics(conf); - if (isEnable != null) { - defaultEnable = isEnable; - } - - LOG.info("Default netty metrics setting is " + defaultEnable); - } - - protected boolean getTopology(Map conf) { - Boolean isEnable = ConfigExtension.isEnableTopologyNettyMetrics(conf); - if (isEnable != null) { - return isEnable; - } - - int workerNum = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_WORKERS), 1); - if (workerNum <= WORKER_DISABLE_THREADHOLD) { - isEnable = Boolean.TRUE; - }else { - isEnable = Boolean.FALSE; - } - - return isEnable; - } - - public boolean getTopology(String topologyId) { - try { - String topologyName = Common.topologyIdToName(topologyId); - - Boolean isEnable = setting.get(topologyName); - if (isEnable != null) { - return isEnable; - } - - Map topologyConf = - StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId); - - isEnable = getTopology(topologyConf); - setting.put(topologyName, isEnable); - LOG.info("{} netty metrics setting is {}", topologyName, isEnable); - return isEnable; - - }catch(Exception e) { - LOG.info("Failed to get {} netty metrics setting ", topologyId); - return defaultEnable; - } - - - } - - public void setTopology(Map conf) { - String topologyName = (String)conf.get(Config.TOPOLOGY_NAME); - if (topologyName == null) { - LOG.info("No topologyName setting"); - return ; - } - - boolean isEnable = getTopology(conf); - - setting.put(topologyName, isEnable); - - LOG.info("{} netty metrics setting is {}", topologyName, isEnable); - return ; - - } - - public void rmTopology(String topologyId) { - String topologyName; - try { - topologyName = Common.topologyIdToName(topologyId); - setting.remove(topologyName); - LOG.info("Remove {} netty metrics setting ", topologyName); - } catch (InvalidTopologyException e) { - // TODO Auto-generated catch block - - } - - } + private static Logger LOG = LoggerFactory.getLogger(TopologyNettyMgr.class); + private Map nimbusConf; + private ConcurrentHashMap<String, Boolean> setting = new ConcurrentHashMap<String, Boolean>(); + + public TopologyNettyMgr(Map conf) { + nimbusConf = conf; + + } + + protected boolean getTopology(Map conf) { + return MetricUtils.isEnableNettyMetrics(conf); + } + + public boolean getTopology(String topologyId) { + try { + String topologyName = Common.topologyIdToName(topologyId); + + Boolean isEnable = setting.get(topologyName); + if (isEnable != null) { + return isEnable; + } + + Map topologyConf = StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId); + + isEnable = getTopology(topologyConf); + setting.put(topologyName, isEnable); + LOG.info("{} netty metrics setting is {}", topologyName, isEnable); + return isEnable; + + } catch (Exception e) { + LOG.info("Failed to get {} netty metrics setting ", topologyId); + return true; + } + + } + + public void setTopology(Map conf) { + String topologyName = (String) conf.get(Config.TOPOLOGY_NAME); + if (topologyName == null) { + LOG.info("No topologyName setting"); + return; + } + + boolean isEnable = getTopology(conf); + + setting.put(topologyName, isEnable); + + LOG.info("{} netty metrics setting is {}", topologyName, isEnable); + return; + + } + + public void rmTopology(String topologyId) { + String topologyName; + try { + topologyName = Common.topologyIdToName(topologyId); + setting.remove(topologyName); + LOG.info("Remove {} netty metrics setting ", topologyName); + } catch (InvalidTopologyException ignored) { + } + + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java new file mode 100644 index 0000000..78bb1d2 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java @@ -0,0 +1,226 @@ +package com.alibaba.jstorm.daemon.nimbus.metric.uploader; + +import backtype.storm.generated.TopologyMetric; +import org.apache.http.HttpEntity; +import org.apache.http.NameValuePair; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AlimonitorClient extends DefaultMetricUploader { + + public static Logger LOG = LoggerFactory.getLogger(AlimonitorClient.class); + + // Send to localhost:15776 by default + public static final String DEFAUT_ADDR = "127.0.0.1"; + public static final String DEFAULT_PORT = "15776"; + public static final int DEFAUTL_FLAG = 0; + public static final String DEFAULT_ERROR_INFO = ""; + + private final String COLLECTION_FLAG = "collection_flag"; + private final String ERROR_INFO = "error_info"; + private final String MSG = "MSG"; + + private String port; + private String requestIP; + private String monitorName; + private int collectionFlag; + private String errorInfo; + + private boolean post; + + public AlimonitorClient() { + } + + public AlimonitorClient(String requestIP, String port, boolean post) { + this.requestIP = requestIP; + this.port = port; + this.post = post; + this.monitorName = null; + this.collectionFlag = 0; + this.errorInfo = null; + } + + public void setIpAddr(String ipAddr) { + this.requestIP = ipAddr; + } + + public void setPort(String port) { + this.port = port; + } + + public void setMonitorName(String monitorName) { + this.monitorName = monitorName; + } + + public void setCollectionFlag(int flag) { + this.collectionFlag = flag; + } + + public void setErrorInfo(String msg) { + this.errorInfo = msg; + } + + public void setPostFlag(boolean post) { + this.post = post; + } + + public String buildURL() { + return "http://" + requestIP + ":" + port + "/passive"; + } + + public String buildRqstAddr() { + return "http://" + requestIP + ":" + port + "/passive?name=" + monitorName + "&msg="; + } + + + public Map buildAliMonitorMsg(int collection_flag, String error_message) { + // Json format of the message sent to Alimonitor + // { + // "collection_flag":int, + // "error_info":string, + // "MSG": ojbect | array + // } + Map ret = new HashMap(); + ret.put(COLLECTION_FLAG, collection_flag); + ret.put(ERROR_INFO, error_message); + ret.put(MSG, null); + + return ret; + } + + private void addMsgData(Map jsonObj, Map<String, Object> map) { + jsonObj.put(MSG, map); + } + + private boolean sendRequest(int collection_flag, String error_message, Map<String, Object> msg) throws Exception { + boolean ret = false; + + if (msg.size() == 0) + return ret; + + Map jsonObj = buildAliMonitorMsg(collection_flag, error_message); + addMsgData(jsonObj, msg); + String jsonMsg = jsonObj.toString(); + LOG.info(jsonMsg); + + if (post == true) { + String url = buildURL(); + ret = httpPost(url, jsonMsg); + } else { + String request = buildRqstAddr(); + StringBuilder postAddr = new StringBuilder(); + postAddr.append(request); + postAddr.append(URLEncoder.encode(jsonMsg)); + + ret = httpGet(postAddr); + } + + return ret; + } + + private boolean httpGet(StringBuilder postAddr) { + boolean ret = false; + + CloseableHttpClient httpClient = HttpClientBuilder.create().build(); + CloseableHttpResponse response = null; + + try { + HttpGet request = new HttpGet(postAddr.toString()); + response = httpClient.execute(request); + HttpEntity entity = response.getEntity(); + if (entity != null) { + LOG.info(EntityUtils.toString(entity)); + } + EntityUtils.consume(entity); + ret = true; + } catch (Exception e) { + LOG.error("Exception when sending http request to alimonitor", e); + } finally { + try { + if (response != null) + response.close(); + httpClient.close(); + } catch (Exception e) { + LOG.error("Exception when closing httpclient", e); + } + } + + return ret; + } + + private boolean httpPost(String url, String msg) { + boolean ret = false; + + CloseableHttpClient httpClient = HttpClientBuilder.create().build(); + CloseableHttpResponse response = null; + + try { + HttpPost request = new HttpPost(url); + List<NameValuePair> nvps = new ArrayList<NameValuePair>(); + nvps.add(new BasicNameValuePair("name", monitorName)); + nvps.add(new BasicNameValuePair("msg", msg)); + request.setEntity(new UrlEncodedFormEntity(nvps)); + response = httpClient.execute(request); + HttpEntity entity = response.getEntity(); + if (entity != null) { + LOG.info(EntityUtils.toString(entity)); + } + EntityUtils.consume(entity); + ret = true; + } catch (Exception e) { + LOG.error("Exception when sending http request to alimonitor", e); + } finally { + try { + if (response != null) + response.close(); + httpClient.close(); + } catch (Exception e) { + LOG.error("Exception when closing httpclient", e); + } + } + + return ret; + } + + + protected Map<String, Object> convertMap(String clusterName, String topologyId, TopologyMetric tpMetric) { + /** + * @@@ Todo + */ + return null; + } + + @Override + public boolean upload(String clusterName, String topologyId, TopologyMetric tpMetric, Map<String, Object> metricContext) { + // TODO Auto-generated method stub + Map<String, Object> metricMap = convertMap(clusterName, topologyId, tpMetric); + if (metricMap == null || metricMap.isEmpty() == true) { + return false; + } + + try { + sendRequest(collectionFlag, errorInfo, metricMap); + return true; + } catch (Exception e) { + // TODO Auto-generated catch block + LOG.error("Failed upload metric to Alimonitor", e); + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java new file mode 100644 index 0000000..58e4e7d --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java @@ -0,0 +1,71 @@ +package com.alibaba.jstorm.daemon.nimbus.metric.uploader; + +import backtype.storm.generated.TopologyMetric; +import com.alibaba.jstorm.daemon.nimbus.NimbusData; +import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable; +import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class DefaultMetricUploader implements MetricUploader { + private final Logger logger = LoggerFactory.getLogger(getClass()); + protected NimbusData nimbusData; + protected TopologyMetricsRunnable metricsRunnable; + + public DefaultMetricUploader() { + } + + @Override + public void init(NimbusData nimbusData) throws Exception { + this.nimbusData = nimbusData; + this.metricsRunnable = nimbusData.getMetricRunnable(); + } + + @Override + public void cleanup() { + } + + @Override + public boolean registerMetrics(String clusterName, String topologyId, + Map<String, Long> metrics) { + if (metrics.size() > 0) { + logger.info("register metrics, topology:{}, total:{}", topologyId, metrics.size()); + } + return true; + } + + @Override + public boolean upload(String clusterName, String topologyId, TopologyMetric tpMetric, Map<String, Object> metricContext) { + if (tpMetric == null) { + logger.info("No metric of {}", topologyId); + return true; + } + + int totalSize = tpMetric.get_topologyMetric().get_metrics_size() + + tpMetric.get_componentMetric().get_metrics_size() + + tpMetric.get_taskMetric().get_metrics_size() + + tpMetric.get_streamMetric().get_metrics_size() + + tpMetric.get_workerMetric().get_metrics_size() + + tpMetric.get_nettyMetric().get_metrics_size(); + + logger.info("send metrics, cluster:{}, topology:{}, metric size:{}, metricContext:{}", + clusterName, topologyId, totalSize, metricContext); + + return true; + } + + @Override + public boolean upload(String clusterName, String topologyId, Object key, Map<String, Object> metricContext) { + metricsRunnable.markUploaded((Integer) key); + return true; + } + + + @Override + public boolean sendEvent(String clusterName, Event event) { + logger.info("Successfully sendEvent {} of {}", event, clusterName); + return true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java new file mode 100644 index 0000000..9b7c745 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java @@ -0,0 +1,46 @@ +package com.alibaba.jstorm.daemon.nimbus.metric.uploader; + +import backtype.storm.generated.TopologyMetric; +import com.alibaba.jstorm.daemon.nimbus.NimbusData; +import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable; + +import java.util.Map; + +public interface MetricUploader { + /** + * Set NimbusData to MetricUploader + */ + void init(NimbusData nimbusData) throws Exception; + + void cleanup(); + + /** + * register metrics to external metric plugin + */ + boolean registerMetrics(String clusterName, String topologyId, + Map<String, Long> metrics) throws Exception; + + String METRIC_TYPE = "metric.type"; + String METRIC_TYPE_TOPLOGY = "TP"; + String METRIC_TYPE_TASK = "TASK"; + String METRIC_TYPE_ALL = "ALL"; + String METRIC_TIME = "metric.timestamp"; + + /** + * upload topologyMetric to external metric plugin (such as database plugin) + * + * @return true means success, false means failure + */ + boolean upload(String clusterName, String topologyId, TopologyMetric tpMetric, Map<String, Object> metricContext); + + /** + * upload metrics with given key and metric context. the implementation can retrieve metric data from rocks db + * in the handler thread, which is kind of lazy-init, making it more GC-friendly + */ + boolean upload(String clusterName, String topologyId, Object key, Map<String, Object> metricContext); + + /** + * Send an event to underlying handler + */ + boolean sendEvent(String clusterName, TopologyMetricsRunnable.Event event); +}
