http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricsReporter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricsReporter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricsReporter.java index 489bec8..5039c87 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricsReporter.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricsReporter.java @@ -17,390 +17,300 @@ */ package com.alibaba.jstorm.metric; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; -import backtype.storm.LocalCluster; import backtype.storm.generated.MetricInfo; -import backtype.storm.generated.MetricWindow; -import backtype.storm.generated.NettyMetric; +import backtype.storm.generated.TopologyMetric; import backtype.storm.generated.WorkerUploadMetrics; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Values; import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; - +import com.alibaba.jstorm.callback.AsyncLoopThread; import com.alibaba.jstorm.callback.RunnableCallback; import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.cluster.StormClusterState; +import com.alibaba.jstorm.cluster.Common; import com.alibaba.jstorm.cluster.StormConfig; -import com.alibaba.jstorm.common.metric.Gauge; -import com.alibaba.jstorm.common.metric.MetricFilter; -import com.alibaba.jstorm.common.metric.MetricRegistry; -import com.alibaba.jstorm.common.metric.window.Metric; +import com.alibaba.jstorm.common.metric.AsmMetric; +import com.alibaba.jstorm.daemon.nimbus.NimbusData; +import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.Update; import com.alibaba.jstorm.daemon.worker.WorkerData; -import com.alibaba.jstorm.utils.JStormUtils; -import com.codahale.metrics.health.HealthCheck; -import com.codahale.metrics.health.HealthCheckRegistry; - -public class JStormMetricsReporter extends RunnableCallback { - private static final Logger LOG = LoggerFactory - .getLogger(JStormMetricsReporter.class); - - private MetricRegistry workerMetrics = JStormMetrics.workerMetrics; - private Map<Integer, MetricRegistry> taskMetrics = - JStormMetrics.taskMetrics; - private MetricRegistry skipMetrics = JStormMetrics.skipMetrics; +import com.alibaba.jstorm.utils.JStormServerUtils; +import com.alibaba.jstorm.utils.TimeUtils; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - private JStormMetricFilter inputFilter; +import java.util.*; - private JStormMetricFilter outputFilter; +/** + * report metrics from worker to nimbus server. this class serves as an object in Worker/Nimbus/Supervisor. + * when in Worker, it reports data via netty transport; otherwise reports via thrift. + * <p/> + * there are 2 threads: + * 1.flush thread: check every 1 sec, when current time is aligned to 1 min, flush all metrics to snapshots + * 2.check meta thread: use thrift to get metric id from nimbus server. + * + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class JStormMetricsReporter { + private static final Logger LOG = LoggerFactory.getLogger(JStormMetricsReporter.class); private Map conf; - private String topologyId; - private String supervisorId; - private int port; - private int frequence; - - private StormClusterState clusterState; - private boolean localMode = false; - private NimbusClient client; - - public JStormMetricsReporter(WorkerData workerData) { - this.conf = workerData.getStormConf(); - this.topologyId = (String) conf.get(Config.TOPOLOGY_ID); - this.supervisorId = workerData.getSupervisorId(); - this.port = workerData.getPort(); - this.frequence = ConfigExtension.getWorkerMetricReportFrequency(conf); - this.clusterState = workerData.getZkCluster(); - - outputFilter = new JStormMetricFilter(MetricDef.OUTPUT_TAG); - inputFilter = new JStormMetricFilter(MetricDef.INPUT_TAG); - localMode = StormConfig.local_mode(conf); - LOG.info("Successfully start "); - } + protected String clusterName; + protected String topologyId; + protected String host; + protected int port; - protected boolean getMoreMetric( - Map<String, Map<String, MetricWindow>> extraMap, - JStormMetricFilter metricFilter, String metricFullName, - Map<Integer, Double> metricWindow) { - if (metricFilter.matches(metricFullName, null) == false) { - return false; - } + protected boolean localMode = false; - int pos = metricFullName.indexOf(MetricRegistry.NAME_SEPERATOR); - if (pos <= 0 || pos >= metricFullName.length() - 1) { - return false; - } + private AsyncLoopThread checkMetricMetaThread; + protected final int checkMetaThreadCycle; - String metricName = metricFullName.substring(0, pos); - String extraName = metricFullName.substring(pos + 1); + private AsyncLoopThread flushMetricThread; + protected final int flushMetricThreadCycle; - Map<String, MetricWindow> item = extraMap.get(metricName); - if (item == null) { - item = new HashMap<String, MetricWindow>(); - extraMap.put(metricName, item); - } + private boolean test = false; - MetricWindow metricWindowThrift = new MetricWindow(); - metricWindowThrift.set_metricWindow(metricWindow); + private boolean isInWorker = false; - item.put(extraName, metricWindowThrift); + private SpoutOutputCollector spoutOutput; + private OutputCollector boltOutput; - return true; - } - - protected void insertNettyMetrics(Map<String, MetricInfo> nettyMetricInfo, - Map<Integer, Double> snapshot, - String metricFullName) { - int pos = metricFullName.indexOf(MetricRegistry.NAME_SEPERATOR); - if (pos < 0 || pos >= metricFullName.length() - 1) { - return ; - } - - String realHeader = metricFullName.substring(0, pos); - String nettyConnection = metricFullName.substring(pos + 1); - - MetricInfo metricInfo = nettyMetricInfo.get(nettyConnection); - if (metricInfo == null) { - metricInfo = MetricThrift.mkMetricInfo(); - - nettyMetricInfo.put(nettyConnection, metricInfo); - } - - MetricThrift.insert(metricInfo, realHeader, snapshot); - } - - protected void insertMergeList(Map<String, List<Map<Integer, Double> > > mergeMap, - List<String> mergeList, - Map<Integer, Double> snapshot, - String name) { - for (String tag : mergeList) { - if (name.startsWith(tag) == false) { - continue; - } - List<Map<Integer, Double> > list = mergeMap.get(tag); - if (list == null) { - list = new ArrayList<Map<Integer,Double>>(); - mergeMap.put(tag, list); - } - - list.add(snapshot); - - } - } - - protected void doMergeList(MetricInfo workerMetricInfo, - Map<String, List<Map<Integer, Double> > > mergeMap) { - for (Entry<String, List<Map<Integer, Double> > > entry : mergeMap.entrySet()) { - String name = entry.getKey(); - List<Map<Integer, Double>> list = entry.getValue(); - - Map<Integer, Double> merged = JStormUtils.mergeMapList(list); - - MetricThrift.insert(workerMetricInfo, name, merged); - } - } + private boolean enableMetrics; + private NimbusClient client = null; - public MetricInfo computWorkerMetrics() { - MetricInfo workerMetricInfo = MetricThrift.mkMetricInfo(); - Map<String, MetricInfo> nettyMetricInfo = new HashMap<String, MetricInfo>(); - - Map<String, List<Map<Integer, Double> > > mergeMap = - new HashMap<String, List<Map<Integer,Double> > >(); - List<String> mergeList = new ArrayList<String>(); - mergeList.add(MetricDef.NETTY_CLI_SEND_SPEED); - - Map<String, Metric> workerMetricMap = workerMetrics.getMetrics(); - for (Entry<String, Metric> entry : workerMetricMap.entrySet()) { - String name = entry.getKey(); - Map<Integer, Double> snapshot = entry.getValue().getSnapshot(); - - if (MetricDef.isNettyDetails(name) == false) { - MetricThrift.insert(workerMetricInfo, name, snapshot); - continue; - } - - insertNettyMetrics(nettyMetricInfo, snapshot, name); - - insertMergeList(mergeMap, mergeList, snapshot, name); - + public JStormMetricsReporter(Object role) { + LOG.info("starting jstorm metrics reporter"); + if (role instanceof WorkerData) { + WorkerData workerData = (WorkerData) role; + this.conf = workerData.getStormConf(); + this.topologyId = (String) conf.get(Config.TOPOLOGY_ID); + this.port = workerData.getPort(); + this.isInWorker = true; + } else if (role instanceof NimbusData) { + NimbusData nimbusData = (NimbusData) role; + this.conf = nimbusData.getConf(); + this.topologyId = JStormMetrics.NIMBUS_METRIC_KEY; } - - doMergeList(workerMetricInfo, mergeMap); - - JStormMetrics.setExposeWorkerMetrics(workerMetricInfo); - JStormMetrics.setExposeNettyMetrics(nettyMetricInfo); - return workerMetricInfo; - } - - public boolean isTaskQueueFull(Metric metric, - Map<Integer, Double> snapshot, String name) { - if (metric instanceof Gauge) { - if (MetricDef.TASK_QUEUE_SET.contains(name)) { - for (Entry<Integer, Double> entry : snapshot.entrySet()) { - if (entry.getValue() == MetricDef.FULL_RATIO) { - return true; - } - } - } + this.host = JStormMetrics.getHost(); + this.enableMetrics = JStormMetrics.isEnabled(); + if (!enableMetrics) { + LOG.warn("***** topology metrics is disabled! *****"); + } else { + LOG.info("topology metrics is enabled."); } - return false; - } - - public Map<Integer, MetricInfo> computeTaskMetrics() { - Map<Integer, MetricInfo> ret = new HashMap<Integer, MetricInfo>(); + this.checkMetaThreadCycle = 30; + // flush metric snapshots when time is aligned, check every sec. + this.flushMetricThreadCycle = 1; - for (Entry<Integer, MetricRegistry> entry : taskMetrics.entrySet()) { - Integer taskId = entry.getKey(); - MetricRegistry taskMetrics = entry.getValue(); + LOG.info("check meta thread freq:{}, flush metrics thread freq:{}", checkMetaThreadCycle, flushMetricThreadCycle); - Map<String, Map<String, MetricWindow>> inputMap = - new HashMap<String, Map<String, MetricWindow>>(); - Map<String, Map<String, MetricWindow>> outputMap = - new HashMap<String, Map<String, MetricWindow>>(); - - MetricInfo taskMetricInfo = MetricThrift.mkMetricInfo(); - taskMetricInfo.set_inputMetric(inputMap); - taskMetricInfo.set_outputMetric(outputMap); - ret.put(taskId, taskMetricInfo); - - for (Entry<String, Metric> metricEntry : taskMetrics.getMetrics() - .entrySet()) { - String name = metricEntry.getKey(); - Metric metric = metricEntry.getValue(); - Map<Integer, Double> snapshot = metric.getSnapshot(); - - boolean isInput = - getMoreMetric(inputMap, inputFilter, name, snapshot); - boolean isOutput = - getMoreMetric(outputMap, outputFilter, name, snapshot); - - if (isInput == false && isOutput == false) { - MetricThrift.insert(taskMetricInfo, name, snapshot); - } - } - - MetricThrift.merge(taskMetricInfo, inputMap); - MetricThrift.merge(taskMetricInfo, outputMap); - - } + this.localMode = StormConfig.local_mode(conf); + this.clusterName = ConfigExtension.getClusterName(conf); + LOG.info("done."); + } - JStormMetrics.setExposeTaskMetrics(ret); - return ret; + @VisibleForTesting + JStormMetricsReporter() { + LOG.info("Successfully started jstorm metrics reporter for test."); + this.test = true; + this.flushMetricThreadCycle = 1; + this.checkMetaThreadCycle = 30; } - - public void healthCheck(Integer taskId, HealthCheckRegistry healthCheck) { - if (taskId == null) { - return ; - } - - final Map<String, HealthCheck.Result> results = - healthCheck.runHealthChecks(); - for (Entry<String, HealthCheck.Result> resultEntry : results - .entrySet()) { - HealthCheck.Result result = resultEntry.getValue(); - if (result.isHealthy() == false) { - LOG.warn("{}:{}", taskId, result.getMessage()); - try { - clusterState.report_task_error(topologyId, taskId, - result.getMessage()); - } catch (Exception e) { - // TODO Auto-generated catch block - LOG.error(e.getMessage(), e); - } - } + public void init() { + if (!localMode && enableMetrics) { + this.checkMetricMetaThread = new AsyncLoopThread(new CheckMetricMetaThread()); + this.flushMetricThread = new AsyncLoopThread(new FlushMetricThread()); } } - public void healthCheck() { - Integer firstTask = null; - - Map<Integer, HealthCheckRegistry> taskHealthCheckMap = - JStormHealthCheck.getTaskhealthcheckmap(); - - for (Entry<Integer, HealthCheckRegistry> entry : taskHealthCheckMap - .entrySet()) { - Integer taskId = entry.getKey(); - HealthCheckRegistry taskHealthCheck = entry.getValue(); - - healthCheck(taskId, taskHealthCheck); + private Map<String, Long> registerMetrics(Set<String> names) { + if (test || !enableMetrics) { + return new HashMap<>(); + } + try { + if (client == null) { + client = NimbusClient.getConfiguredClient(conf); + } - if (firstTask != null) { - firstTask = taskId; + return client.getClient().registerMetrics(topologyId, names); + } catch (Exception e) { + LOG.error("Failed to gen metric ids", e); + if (client != null) { + client.close(); + client = NimbusClient.getConfiguredClient(conf); } } - HealthCheckRegistry workerHealthCheck = - JStormHealthCheck.getWorkerhealthcheck(); - healthCheck(firstTask, workerHealthCheck); - - + return null; } - @Override - public void run() { + public void shutdown() { + if (!localMode && enableMetrics) { + this.checkMetricMetaThread.cleanup(); + this.flushMetricThread.cleanup(); + } + } + public void doUpload() { + if (test) { + return; + } try { - // TODO Auto-generated method stub - MetricInfo workerMetricInfo = computWorkerMetrics(); - - Map<Integer, MetricInfo> taskMetricMap = computeTaskMetrics(); + long start = System.currentTimeMillis(); + MetricInfo workerMetricInfo = JStormMetrics.computeAllMetrics(); WorkerUploadMetrics upload = new WorkerUploadMetrics(); - upload.set_topology_id(topologyId); - upload.set_supervisor_id(supervisorId); + upload.set_topologyId(topologyId); + upload.set_supervisorId(host); upload.set_port(port); - upload.set_workerMetric(workerMetricInfo); - upload.set_nettyMetric( - new NettyMetric( - JStormMetrics.getExposeNettyMetrics(), - JStormMetrics.getExposeNettyMetrics().size())); - upload.set_taskMetric(taskMetricMap); - - uploadMetric(upload); - - healthCheck(); - - LOG.info("Successfully upload worker's metrics"); - LOG.info(Utils.toPrettyJsonString(workerMetricInfo)); - LOG.info(Utils.toPrettyJsonString(JStormMetrics.getExposeNettyMetrics())); - LOG.info(Utils.toPrettyJsonString(taskMetricMap)); + upload.set_allMetrics(workerMetricInfo); + + if (workerMetricInfo.get_metrics_size() > 0) { + uploadMetric(upload); + LOG.info("Successfully upload worker metrics, size:{}, cost:{}", + workerMetricInfo.get_metrics_size(), System.currentTimeMillis() - start); + } else { + LOG.info("No metrics to upload."); + } } catch (Exception e) { LOG.error("Failed to upload worker metrics", e); } - } - public void uploadMetric(WorkerUploadMetrics upload) { - if (StormConfig.local_mode(conf)) { - try { - byte[] temp = Utils.serialize(upload); - LocalCluster.getInstance().getLocalClusterMap().getNimbus() - .workerUploadMetric(upload); - } catch (TException e) { - // TODO Auto-generated catch block - LOG.error("Failed to upload worker metrics", e); + public void uploadMetric(WorkerUploadMetrics metrics) { + if (isInWorker) { + //in Worker, we upload data via netty transport + if (boltOutput != null) { + LOG.info("emit metrics through bolt collector."); + boltOutput.emit(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID, + new Values(JStormServerUtils.getName(host, port), metrics)); + } else if (spoutOutput != null) { + LOG.info("emit metrics through spout collector."); + spoutOutput.emit(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID, + new Values(JStormServerUtils.getName(host, port), metrics)); } - } else { + }else { + // in supervisor or nimbus, we upload metric data via thrift + LOG.info("emit metrics through nimbus client."); + Update event = new Update(); + TopologyMetric tpMetric = MetricUtils.mkTopologyMetric(); + tpMetric.set_workerMetric(metrics.get_allMetrics()); + + event.topologyMetrics = tpMetric; + event.topologyId = topologyId; + try { - if (client == null) { - client = NimbusClient.getConfiguredClient(conf); - } - client.getClient().workerUploadMetric(upload); - } catch (Exception e) { - LOG.error("Failed to upload worker metrics", e); + if (client == null) { + client = NimbusClient.getConfiguredClient(conf); + } + client.getClient().uploadTopologyMetrics(topologyId, tpMetric); + } catch (Exception ex) { + LOG.error("upload metric error:", ex); if (client != null) { client.close(); - client = null; + client = NimbusClient.getConfiguredClient(conf); } - } finally { - } } + //MetricUtils.logMetrics(metrics.get_allMetrics()); } - @Override - public Object getResult() { - return frequence; - } - - @Override - public void shutdown() { - if (client != null) { - client.close(); - client = null; + + public void setOutputCollector(Object outputCollector) { + if (outputCollector instanceof OutputCollector) { + this.boltOutput = (OutputCollector) outputCollector; + } else if (outputCollector instanceof SpoutOutputCollector) { + this.spoutOutput = (SpoutOutputCollector) outputCollector; } + } - public static class JStormMetricFilter implements MetricFilter { - private static final long serialVersionUID = -8886536175626248855L; - private String[] tags; - public JStormMetricFilter(String[] tags) { - this.tags = tags; + class FlushMetricThread extends RunnableCallback { + @Override + public void run() { + if (TimeUtils.isTimeAligned()) { + int cnt = 0; + try { + for (AsmMetricRegistry registry : JStormMetrics.allRegistries) { + for (Map.Entry<String, AsmMetric> entry : registry.getMetrics().entrySet()) { + entry.getValue().flush(); + cnt++; + } + } + LOG.info("flush metrics, total:{}.", cnt); + + doUpload(); + } catch (Exception ex) { + LOG.error("Error", ex); + } + } + } + + @Override + public Object getResult() { + return flushMetricThreadCycle; } + } + + class CheckMetricMetaThread extends RunnableCallback { + private volatile boolean processing = false; + private final long start = TimeUtils.current_time_secs(); + private final long initialDelay = 30 + new Random().nextInt(15); @Override - public boolean matches(String name, Metric metric) { - // TODO Auto-generated method stub - for (String tag : tags) { - if (name.startsWith(tag)) { - return true; + public void run() { + if (TimeUtils.current_time_secs() - start < initialDelay) { + return; + } + + if (processing) { + LOG.info("still processing, skip..."); + } else { + processing = true; + long start = System.currentTimeMillis(); + try { + Set<String> names = new HashSet<>(); + for (AsmMetricRegistry registry : JStormMetrics.allRegistries) { + Map<String, AsmMetric> metricMap = registry.getMetrics(); + for (Map.Entry<String, AsmMetric> metricEntry : metricMap.entrySet()) { + AsmMetric metric = metricEntry.getValue(); + if (((metric.getOp() & AsmMetric.MetricOp.REPORT) == AsmMetric.MetricOp.REPORT) && + metric.getMetricId() == 0L) { + names.add(metricEntry.getKey()); + } + } + } + + if (names.size() > 0) { + Map<String, Long> nameIdMap = registerMetrics(names); + if (nameIdMap != null) { + for (String name : nameIdMap.keySet()) { + AsmMetric metric = JStormMetrics.find(name); + if (metric != null) { + long id = nameIdMap.get(name); + metric.setMetricId(id); + LOG.info("set metric id, {}:{}", name, id); + } + } + } + LOG.info("register metrics, size:{}, cost:{}", names.size(), System.currentTimeMillis() - start); + } + } catch (Exception ex) { + LOG.error("Error", ex); } + processing = false; } - return false; } + @Override + public Object getResult() { + return checkMetaThreadCycle; + } } - }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/KVSerializable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/KVSerializable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/KVSerializable.java new file mode 100644 index 0000000..7adaf46 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/KVSerializable.java @@ -0,0 +1,17 @@ +package com.alibaba.jstorm.metric; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public interface KVSerializable { + String START = "S", END = "E"; + int LONG_SIZE = 8; + int INT_SIZE = 4; + + public byte[] getKey(); + + public byte[] getValue(); + + public Object fromKV(byte[] key, byte[] value); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaFilter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaFilter.java new file mode 100644 index 0000000..f39f51c --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaFilter.java @@ -0,0 +1,11 @@ +package com.alibaba.jstorm.metric; + +import com.alibaba.jstorm.common.metric.MetricMeta; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public interface MetaFilter { + boolean matches(MetricMeta meta, Object arg); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaType.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaType.java new file mode 100644 index 0000000..3dfe665 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetaType.java @@ -0,0 +1,50 @@ +package com.alibaba.jstorm.metric; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public enum MetaType { + TASK(1, "T"), COMPONENT(2, "C"), STREAM(3, "S"), WORKER(4, "W"), TOPOLOGY(5, "P"), NETTY(6, "N"), NIMBUS(7, "M"); + + private int t; + private String v; + + MetaType(int t, String v) { + this.t = t; + this.v = v; + } + + private static final Map<String, MetaType> valueMap = new HashMap<String, MetaType>(); + private static final Map<Integer, MetaType> typeMap = new HashMap<Integer, MetaType>(); + + static { + for (MetaType type : MetaType.values()) { + typeMap.put(type.getT(), type); + valueMap.put(type.getV(), type); + } + } + + public String getV() { + return this.v; + } + + public int getT() { + return t; + } + + public static MetaType parse(char ch) { + return parse(ch + ""); + } + + public static MetaType parse(String v) { + return valueMap.get(v); + } + + public static MetaType parse(int t) { + return typeMap.get(t); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricClient.java new file mode 100644 index 0000000..1d63f46 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricClient.java @@ -0,0 +1,92 @@ +package com.alibaba.jstorm.metric; + +import backtype.storm.task.TopologyContext; +import com.alibaba.jstorm.common.metric.*; +import com.codahale.metrics.Gauge; + +/** + * metric client for end users to add custom metrics. + * + * @author Cody ([email protected]) + * @since 2.0.5 + */ +@SuppressWarnings("unused") +public class MetricClient { + private static final String GROUP_UDF = "udf"; + + private final String topologyId; + private final String componentId; + private final int taskId; + + public MetricClient(TopologyContext context) { + taskId = context.getThisTaskId(); + this.topologyId = context.getTopologyId(); + this.componentId = context.getThisComponentId(); + } + + public AsmGauge registerGauge(String name, Gauge<Double> gauge) { + return registerGauge(name, GROUP_UDF, gauge); + } + + public AsmGauge registerGauge(String name, String group, Gauge<Double> gauge) { + String userMetricName = getMetricName(name, group, MetricType.GAUGE); + AsmGauge asmGauge = new AsmGauge(gauge); + JStormMetrics.registerTaskMetric(userMetricName, asmGauge); + return asmGauge; + } + + public AsmCounter registerCounter(String name) { + return registerCounter(name, GROUP_UDF); + } + + public AsmCounter registerCounter(String name, String group) { + String userMetricName = getMetricName(name, group, MetricType.COUNTER); + AsmCounter counter = new AsmCounter(); + JStormMetrics.registerTaskMetric(userMetricName, counter); + return counter; + } + + public AsmMeter registerMeter(String name) { + return registerMeter(name, GROUP_UDF); + } + + public AsmMeter registerMeter(String name, String group) { + String userMetricName = getMetricName(name, group, MetricType.METER); + return (AsmMeter) JStormMetrics.registerTaskMetric(userMetricName, new AsmMeter()); + } + + public AsmTimer registerTimer(String name) { + return registerTimer(name, GROUP_UDF); + } + + public AsmTimer registerTimer(String name, String group) { + String userMetricName = getMetricName(name, group, MetricType.TIMER); + return (AsmTimer) JStormMetrics.registerTaskMetric(userMetricName, new AsmTimer()); + } + + public AsmHistogram registerHistogram(String name) { + return registerHistogram(name, GROUP_UDF); + } + + public AsmHistogram registerHistogram(String name, String group) { + String userMetricName = getMetricName(name, group, MetricType.HISTOGRAM); + return (AsmHistogram) JStormMetrics.registerTaskMetric(userMetricName, new AsmHistogram()); + } + + public void unregister(String name, MetricType type) { + unregister(name, GROUP_UDF, type); + } + + public void unregister(String name, String group, MetricType type) { + String userMetricName = getMetricName(name, group, type); + JStormMetrics.unregisterTaskMetric(userMetricName); + } + + private String getMetricName(String name, MetricType type) { + return getMetricName(name, GROUP_UDF, type); + } + + private String getMetricName(String name, String group, MetricType type) { + return MetricUtils.taskMetricName(topologyId, componentId, taskId, group, name, type); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDataConverter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDataConverter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDataConverter.java new file mode 100644 index 0000000..cdb47f3 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDataConverter.java @@ -0,0 +1,87 @@ +package com.alibaba.jstorm.metric; + +import backtype.storm.generated.MetricSnapshot; +import com.alibaba.jstorm.common.metric.*; +import com.alibaba.jstorm.utils.TimeUtils; + +import java.util.Date; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class MetricDataConverter { + + public static CounterData toCounterData(MetricSnapshot snapshot, int win) { + CounterData data = new CounterData(); + convertBase(snapshot, data, win); + + data.setV(snapshot.get_longValue()); + return data; + } + + public static GaugeData toGaugeData(MetricSnapshot snapshot, int win) { + GaugeData data = new GaugeData(); + convertBase(snapshot, data, win); + + data.setV(snapshot.get_doubleValue()); + return data; + } + + public static MeterData toMeterData(MetricSnapshot snapshot, int win) { + MeterData data = new MeterData(); + convertBase(snapshot, data, win); + + data.setM1(snapshot.get_m1()); + data.setM5(snapshot.get_m5()); + data.setM15(snapshot.get_m15()); + data.setMean(snapshot.get_mean()); + + return data; + } + + public static HistogramData toHistogramData(MetricSnapshot snapshot, int win) { + HistogramData data = new HistogramData(); + convertBase(snapshot, data, win); + + data.setMin(snapshot.get_min()); + data.setMax(snapshot.get_max()); + data.setP50(snapshot.get_p50()); + data.setP75(snapshot.get_p75()); + data.setP95(snapshot.get_p95()); + data.setP98(snapshot.get_p98()); + data.setP99(snapshot.get_p99()); + data.setP999(snapshot.get_p999()); + data.setMean(snapshot.get_mean()); + + return data; + } + + public static TimerData toTimerData(MetricSnapshot snapshot, int win) { + TimerData data = new TimerData(); + convertBase(snapshot, data, win); + + data.setMin(snapshot.get_min()); + data.setMax(snapshot.get_max()); + data.setP50(snapshot.get_p50()); + data.setP75(snapshot.get_p75()); + data.setP95(snapshot.get_p95()); + data.setP98(snapshot.get_p98()); + data.setP99(snapshot.get_p99()); + data.setP999(snapshot.get_p999()); + data.setMean(snapshot.get_mean()); + data.setM1(snapshot.get_m1()); + data.setM5(snapshot.get_m5()); + data.setM15(snapshot.get_m15()); + + return data; + } + + private static void convertBase(MetricSnapshot snapshot, MetricBaseData data, int win) { + long newTs = TimeUtils.alignTimeToWin(snapshot.get_ts(), win); + data.setWin(win); + data.setMetricId(snapshot.get_metricId()); + data.setTs(new Date(newTs)); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDef.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDef.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDef.java index 58413bb..c060e51 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDef.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricDef.java @@ -17,9 +17,7 @@ */ package com.alibaba.jstorm.metric; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; public class MetricDef { @@ -28,10 +26,8 @@ public class MetricDef { public static final String TIME_TYPE = "Time"; public static final String DESERIALIZE_THREAD = "Deserialize"; - public static final String DESERIALIZE_QUEUE = DESERIALIZE_THREAD - + QUEUE_TYPE; - public static final String DESERIALIZE_TIME = DESERIALIZE_THREAD - + TIME_TYPE; + public static final String DESERIALIZE_QUEUE = DESERIALIZE_THREAD + QUEUE_TYPE; + public static final String DESERIALIZE_TIME = DESERIALIZE_THREAD + TIME_TYPE; public static final String SERIALIZE_THREAD = "Serialize"; public static final String SERIALIZE_QUEUE = SERIALIZE_THREAD + QUEUE_TYPE; @@ -45,54 +41,46 @@ public class MetricDef { public static final String EMPTY_CPU_RATIO = "EmptyCpuRatio"; public static final String PENDING_MAP = "PendingNum"; public static final String COLLECTOR_EMIT_TIME = "EmitTime"; - - + public static final String TUPLE_LIEF_CYCLE = "TupleLifeCycle"; public static final String DISPATCH_THREAD = "VirtualPortDispatch"; public static final String DISPATCH_QUEUE = DISPATCH_THREAD + QUEUE_TYPE; public static final String DISPATCH_TIME = DISPATCH_THREAD + TIME_TYPE; public static final String BATCH_DRAINER_THREAD = "BatchDrainer"; - public static final String BATCH_DRAINER_QUEUE = BATCH_DRAINER_THREAD - + QUEUE_TYPE; - public static final String BATCH_DRAINER_TIME = BATCH_DRAINER_THREAD - + TIME_TYPE; + public static final String BATCH_DRAINER_QUEUE = BATCH_DRAINER_THREAD + QUEUE_TYPE; + public static final String BATCH_DRAINER_TIME = BATCH_DRAINER_THREAD + TIME_TYPE; public static final String DRAINER_THREAD = "Drainer"; public static final String DRAINER_QUEUE = DRAINER_THREAD + QUEUE_TYPE; public static final String DRAINER_TIME = DRAINER_THREAD + TIME_TYPE; - public static final String NETWORK_MSG_DECODE_TIME = "NetworkMsgDecodeTime"; - + // all tag start with "Netty" will be specially display in Web UI public static final String NETTY = "Netty"; public static final String NETTY_CLI = NETTY + "Client"; public static final String NETTY_SRV = NETTY + "Server"; public static final String NETTY_CLI_SEND_SPEED = NETTY_CLI + "SendSpeed"; public static final String NETTY_SRV_RECV_SPEED = NETTY_SRV + "RecvSpeed"; - + public static final String NETTY_CLI_SEND_TIME = NETTY_CLI + "SendTime"; - public static final String NETTY_CLI_BATCH_SIZE = - NETTY_CLI + "SendBatchSize"; - public static final String NETTY_CLI_SEND_PENDING = - NETTY_CLI + "SendPendings"; - public static final String NETTY_CLI_SYNC_BATCH_QUEUE = - NETTY_CLI + "SyncBatchQueue"; - public static final String NETTY_CLI_SYNC_DISR_QUEUE = - NETTY_CLI + "SyncDisrQueue"; + public static final String NETTY_CLI_BATCH_SIZE = NETTY_CLI + "SendBatchSize"; + public static final String NETTY_CLI_SEND_PENDING = NETTY_CLI + "SendPendings"; + public static final String NETTY_CLI_SYNC_BATCH_QUEUE = NETTY_CLI + "SyncBatchQueue"; + public static final String NETTY_CLI_SYNC_DISR_QUEUE = NETTY_CLI + "SyncDisrQueue"; public static final String NETTY_CLI_CACHE_SIZE = NETTY_CLI + "CacheSize"; public static final String NETTY_CLI_CONNECTION = NETTY_CLI + "ConnectionCheck"; - + // metric name for worker public static final String NETTY_SRV_MSG_TRANS_TIME = NETTY_SRV + "TransmitTime"; - public static final String ZMQ_SEND_TIME = "ZMQSendTime"; public static final String ZMQ_SEND_MSG_SIZE = "ZMQSendMSGSize"; public static final String CPU_USED_RATIO = "CpuUsedRatio"; public static final String MEMORY_USED = "MemoryUsed"; + public static final String DISK_USAGE = "DiskUsage"; public static final String REMOTE_CLI_ADDR = "RemoteClientAddress"; public static final String REMOTE_SERV_ADDR = "RemoteServerAddress"; @@ -105,10 +93,10 @@ public class MetricDef { public static final String PROCESS_LATENCY = "ProcessLatency"; public static final String[] OUTPUT_TAG = { EMMITTED_NUM, SEND_TPS }; - public static final String[] INPUT_TAG = { RECV_TPS, ACKED_NUM, FAILED_NUM, - PROCESS_LATENCY }; + public static final String[] INPUT_TAG = { RECV_TPS, ACKED_NUM, FAILED_NUM, PROCESS_LATENCY }; public static final Set<String> MERGE_SUM_TAG = new HashSet<String>(); + static { MERGE_SUM_TAG.add(MetricDef.EMMITTED_NUM); MERGE_SUM_TAG.add(MetricDef.SEND_TPS); @@ -119,6 +107,7 @@ public class MetricDef { } public static final Set<String> MERGE_AVG_TAG = new HashSet<String>(); + static { MERGE_AVG_TAG.add(PROCESS_LATENCY); } @@ -128,6 +117,7 @@ public class MetricDef { public static final String QEUEU_IS_FULL = "queue is full"; public static final Set<String> TASK_QUEUE_SET = new HashSet<String>(); + static { TASK_QUEUE_SET.add(DESERIALIZE_QUEUE); TASK_QUEUE_SET.add(SERIALIZE_QUEUE); @@ -136,27 +126,28 @@ public class MetricDef { } public static final Set<String> WORKER_QUEUE_SET = new HashSet<String>(); + static { WORKER_QUEUE_SET.add(DISPATCH_QUEUE); WORKER_QUEUE_SET.add(BATCH_DRAINER_QUEUE); WORKER_QUEUE_SET.add(DRAINER_QUEUE); } - - + public static final int NETTY_METRICS_PACKAGE_SIZE = 200; + public static boolean isNettyDetails(String metricName) { - + Set<String> specialNettySet = new HashSet<String>(); specialNettySet.add(MetricDef.NETTY_CLI_SEND_SPEED); specialNettySet.add(MetricDef.NETTY_SRV_RECV_SPEED); - + if (specialNettySet.contains(metricName)) { return false; } if (metricName.startsWith(MetricDef.NETTY)) { return true; } - + return false; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricIDGenerator.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricIDGenerator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricIDGenerator.java new file mode 100644 index 0000000..3df7bab --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricIDGenerator.java @@ -0,0 +1,9 @@ +package com.alibaba.jstorm.metric; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public interface MetricIDGenerator { + long genMetricId(String metricName); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java index 2989196..16e6357 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java @@ -55,8 +55,7 @@ public class MetricJstack implements Gauge<String> { writer.append("\n"); } - long[] deadLockMonitorTids = - threadMXBean.findMonitorDeadlockedThreads(); + long[] deadLockMonitorTids = threadMXBean.findMonitorDeadlockedThreads(); if (deadLockMonitorTids != null) { writer.append(threadIds.length + " deadlocked monitor threads:"); for (long tid : deadLockMonitorTids) { @@ -66,60 +65,38 @@ public class MetricJstack implements Gauge<String> { } for (long tid : threadIds) { - ThreadInfo info = - threadMXBean.getThreadInfo(tid, Integer.MAX_VALUE); + ThreadInfo info = threadMXBean.getThreadInfo(tid, Integer.MAX_VALUE); if (info == null) { writer.append(" Inactive").append("\n"); continue; } - writer.append( - "Thread " - + getTaskName(info.getThreadId(), - info.getThreadName()) + ":").append("\n"); + writer.append("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":").append("\n"); Thread.State state = info.getThreadState(); writer.append(" State: " + state).append("\n"); - writer.append(" Blocked count: " + info.getBlockedCount()).append( - "\n"); - writer.append(" Waited count: " + info.getWaitedCount()).append( - "\n"); - writer.append(" Cpu time:") - .append(threadMXBean.getThreadCpuTime(tid) / 1000000) - .append("ms").append("\n"); - writer.append(" User time:") - .append(threadMXBean.getThreadUserTime(tid) / 1000000) - .append("ms").append("\n"); + writer.append(" Blocked count: " + info.getBlockedCount()).append("\n"); + writer.append(" Waited count: " + info.getWaitedCount()).append("\n"); + writer.append(" Cpu time:").append(threadMXBean.getThreadCpuTime(tid) / 1000000).append("ms").append("\n"); + writer.append(" User time:").append(threadMXBean.getThreadUserTime(tid) / 1000000).append("ms").append("\n"); if (contention) { - writer.append(" Blocked time: " + info.getBlockedTime()) - .append("\n"); - writer.append(" Waited time: " + info.getWaitedTime()).append( - "\n"); + writer.append(" Blocked time: " + info.getBlockedTime()).append("\n"); + writer.append(" Waited time: " + info.getWaitedTime()).append("\n"); } if (state == Thread.State.WAITING) { - writer.append(" Waiting on " + info.getLockName()) - .append("\n"); + writer.append(" Waiting on " + info.getLockName()).append("\n"); } else if (state == Thread.State.BLOCKED) { - writer.append(" Blocked on " + info.getLockName()) - .append("\n"); - writer.append( - " Blocked by " - + getTaskName(info.getLockOwnerId(), - info.getLockOwnerName())).append("\n"); + writer.append(" Blocked on " + info.getLockName()).append("\n"); + writer.append(" Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName())).append("\n"); } } for (long tid : threadIds) { - ThreadInfo info = - threadMXBean.getThreadInfo(tid, Integer.MAX_VALUE); + ThreadInfo info = threadMXBean.getThreadInfo(tid, Integer.MAX_VALUE); if (info == null) { writer.append(" Inactive").append("\n"); continue; } - writer.append( - "Thread " - + getTaskName(info.getThreadId(), - info.getThreadName()) + ": Stack").append( - "\n"); + writer.append("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ": Stack").append("\n"); for (StackTraceElement frame : info.getStackTrace()) { writer.append(" " + frame.toString()).append("\n"); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricQueryClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricQueryClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricQueryClient.java new file mode 100644 index 0000000..c8a8f56 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricQueryClient.java @@ -0,0 +1,148 @@ +package com.alibaba.jstorm.metric; + +import com.alibaba.jstorm.common.metric.MetricMeta; +import com.alibaba.jstorm.common.metric.TaskTrack; +import com.alibaba.jstorm.common.metric.TopologyHistory; + +import java.util.List; +import java.util.Map; + +/** + * metric query client for getting metric meta & data + * + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public interface MetricQueryClient { + + /** + * init metric query client + */ + void init(Map conf); + + /** + * get metric meta with optional meta filter + * + * @param clusterName cluster name + * @param topologyId topology id + * @param type meta type + * @param filter meta filter, if filter matches, the corresponding meta will be returned. + * @param arg filter argument + * @return meta list + */ + List<MetricMeta> getMetricMeta(String clusterName, String topologyId, MetaType type, MetaFilter filter, Object arg); + + /** + * get metric meta by topology id and meta type + * + * @param clusterName cluster name + * @param topologyId topology id + * @param type meta type + * @return all metric meta + */ + List<MetricMeta> getMetricMeta(String clusterName, String topologyId, MetaType type); + + /** + * get worker metric meta by topology id + * + * @param clusterName cluster name + * @param topologyId topology id + * @return all worker metric meta + */ + List<MetricMeta> getWorkerMeta(String clusterName, String topologyId); + + /** + * get netty metric meta by topology id + * + * @param clusterName cluster name + * @param topologyId topology id + * @return all netty metric meta + */ + List<MetricMeta> getNettyMeta(String clusterName, String topologyId); + + /** + * get task metric meta + * + * @param clusterName cluster name + * @param topologyId topology id + * @param taskId task id + * @return task metric meta + */ + List<MetricMeta> getTaskMeta(String clusterName, String topologyId, int taskId); + + /** + * get component metric meta + * + * @param clusterName cluster name + * @param topologyId topology id + * @param componentId component id + * @return component metric meta + */ + List<MetricMeta> getComponentMeta(String clusterName, String topologyId, String componentId); + + /** + * get metric meta by id + * + * @param clusterName cluster name + * @param topologyId topology id + * @param metaType meta type + * @param metricId metric id + * @return metric meta + */ + MetricMeta getMetricMeta(String clusterName, String topologyId, MetaType metaType, long metricId); + + /** + * get metric data + * + * @param metricId metric id + * @param metricType metric type + * @param win metric window + * @param start start time + * @param end end time + * @return metric data objects, depending on metric type, could be CounterData, GaugeData, ... etc. + */ + List<Object> getMetricData(long metricId, MetricType metricType, int win, long start, long end); + + /** + * get all task track by topology id + * + * @param clusterName cluster name + * @param topologyId topology id + * @return task track + */ + List<TaskTrack> getTaskTrack(String clusterName, String topologyId); + + /** + * get task track by task id + * + * @param clusterName cluster name + * @param topologyId topology id + * @param taskId task id + * @return task track + */ + List<TaskTrack> getTaskTrack(String clusterName, String topologyId, int taskId); + + /** + * get topology history + * + * @param clusterName cluster name + * @param topologyName topology name + * @param size size + * @return topology history list + */ + List<TopologyHistory> getTopologyHistory(String clusterName, String topologyName, int size); + + /** + * delete metrics meta. note that clusterName, topologyId, metaType & id must be set. + * + * @param meta metric meta + */ + void deleteMeta(MetricMeta meta); + + /** + * delete metrics meta list. note that clusterName, topologyId, metaType & id must be set. + * + * @param metaList metric meta list + */ + void deleteMeta(List<MetricMeta> metaList); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricSendClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricSendClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricSendClient.java deleted file mode 100755 index e1313f6..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricSendClient.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.alibaba.jstorm.metric; - -import java.util.List; -import java.util.Map; - -public class MetricSendClient { - - public MetricSendClient() { - } - - public boolean send(Map<String, Object> msg) { - return true; - } - - public boolean send(List<Map<String, Object>> msgList) { - return true; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricThrift.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricThrift.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricThrift.java deleted file mode 100755 index 5286a1c..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricThrift.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.metric; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.generated.MetricInfo; -import backtype.storm.generated.MetricWindow; - -import com.alibaba.jstorm.utils.JStormUtils; - -public class MetricThrift { - private static final Logger LOG = LoggerFactory - .getLogger(MetricThrift.class); - - public static MetricInfo mkMetricInfo() { - MetricInfo metricInfo = - new MetricInfo(new HashMap<String, MetricWindow>()); - - return metricInfo; - } - - public static void insert(MetricInfo metricInfo, String key, - Map<Integer, Double> windowSet) { - MetricWindow metricWindow = new MetricWindow(); - metricWindow.set_metricWindow(windowSet); - - metricInfo.put_to_baseMetric(key, metricWindow); - - } - - public static MetricWindow merge(Map<String, MetricWindow> details) { - Map<Integer, Double> merge = new HashMap<Integer, Double>(); - - for (Entry<String, MetricWindow> entry : details.entrySet()) { - MetricWindow metricWindow = entry.getValue(); - Map<Integer, Double> metric = metricWindow.get_metricWindow(); - - for (Entry<Integer, Double> metricEntry : metric.entrySet()) { - Integer key = metricEntry.getKey(); - try { - Double value = - ((Number) JStormUtils.add(metricEntry.getValue(), - merge.get(key))).doubleValue(); - - merge.put(key, value); - } catch (Exception e) { - LOG.error("Invalid type of " + entry.getKey() + ":" + key, - e); - continue; - } - } - } - - MetricWindow ret = new MetricWindow(); - - ret.set_metricWindow(merge); - return ret; - } - - public static void merge(MetricInfo metricInfo, - Map<String, Map<String, MetricWindow>> extraMap) { - for (Entry<String, Map<String, MetricWindow>> entry : extraMap - .entrySet()) { - String metricName = entry.getKey(); - - metricInfo.put_to_baseMetric(metricName, merge(entry.getValue())); - - } - } - - public static MetricWindow mergeMetricWindow(MetricWindow fromMetric, - MetricWindow toMetric) { - if (toMetric == null) { - toMetric = new MetricWindow(new HashMap<Integer, Double>()); - } - - if (fromMetric == null) { - return toMetric; - } - - List<Map<Integer, Double>> list = new ArrayList<Map<Integer, Double>>(); - list.add(fromMetric.get_metricWindow()); - list.add(toMetric.get_metricWindow()); - Map<Integer, Double> merged = JStormUtils.mergeMapList(list); - - toMetric.set_metricWindow(merged); - - return toMetric; - } - - public static MetricInfo mergeMetricInfo(MetricInfo from, MetricInfo to) { - if (to == null) { - to = mkMetricInfo(); - } - - if (from == null) { - return to; - } - - to.get_baseMetric().putAll(from.get_baseMetric()); - - return to; - - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricType.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricType.java new file mode 100644 index 0000000..0203f9e --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricType.java @@ -0,0 +1,50 @@ +package com.alibaba.jstorm.metric; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public enum MetricType { + COUNTER("C", 1), GAUGE("G", 2), METER("M", 3), HISTOGRAM("H", 4), TIMER("T", 5); + + private String v; + private int t; + + MetricType(String v, int t) { + this.v = v; + this.t = t; + } + + public int getT() { + return this.t; + } + + public String getV() { + return this.v; + } + + private static final Map<String, MetricType> valueMap = new HashMap<String, MetricType>(); + private static final Map<Integer, MetricType> typeMap = new HashMap<Integer, MetricType>(); + + static { + for (MetricType type : MetricType.values()) { + typeMap.put(type.getT(), type); + valueMap.put(type.getV(), type); + } + } + + public static MetricType parse(char ch) { + return parse(ch + ""); + } + + public static MetricType parse(String s) { + return valueMap.get(s); + } + + public static MetricType parse(int t) { + return typeMap.get(t); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricUtils.java new file mode 100644 index 0000000..e76effd --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/MetricUtils.java @@ -0,0 +1,600 @@ +package com.alibaba.jstorm.metric; + +import backtype.storm.generated.MetricInfo; +import backtype.storm.generated.MetricSnapshot; +import backtype.storm.generated.TopologyMetric; +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.common.metric.AsmMetric; +import com.alibaba.jstorm.common.metric.snapshot.*; +import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeUtils; +import com.codahale.metrics.*; +import com.codahale.metrics.Timer; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class MetricUtils { + private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class); + + public static final char AT = '@'; + public static final String DELIM = AT + ""; + public static final String EMPTY = ""; + public static final String DEFAULT_GROUP = "sys"; + + public static final int MAX_POINTS_PER_WORKER = 200; + public static final int NETTY_METRIC_PAGE_SIZE = 200; + + public static boolean isValidId(long metricId) { + return metricId != 0; + } + + public static MetricInfo mkMetricInfo() { + MetricInfo ret = new MetricInfo(); + ret.set_metrics(new HashMap<String, Map<Integer, MetricSnapshot>>()); + + return ret; + } + + public static TopologyMetric mkTopologyMetric() { + TopologyMetric emptyTopologyMetric = new TopologyMetric(); + + emptyTopologyMetric.set_topologyMetric(new MetricInfo()); + emptyTopologyMetric.set_componentMetric(new MetricInfo()); + emptyTopologyMetric.set_workerMetric(new MetricInfo()); + emptyTopologyMetric.set_taskMetric(new MetricInfo()); + emptyTopologyMetric.set_streamMetric(new MetricInfo()); + emptyTopologyMetric.set_nettyMetric(new MetricInfo()); + + return emptyTopologyMetric; + } + + public static boolean isEnableNettyMetrics(Map stormConf) { + int maxWorkerNumForNetty = ConfigExtension.getTopologyMaxWorkerNumForNettyMetrics(stormConf); + int workerNum = JStormUtils.parseInt(stormConf.get("topology.workers"), 1); + return workerNum < maxWorkerNumForNetty; + } + + /** + * a metric name composites of: type@topologyId@componentId@taskId@streamId@group@name for non-worker metrics OR type@topologyId@host@port@group@name for + * worker metrics + */ + public static String metricName(String type, String topologyId, String componentId, int taskId, String streamId, String group, String name) { + return concat(type, topologyId, componentId, taskId, streamId, group, name); + } + + public static String streamMetricName(String topologyId, String componentId, int taskId, String streamId, String name, MetricType type) { + return concat(MetaType.STREAM.getV() + type.getV(), topologyId, componentId, taskId, streamId, DEFAULT_GROUP, name); + } + + public static String workerMetricName(String topologyId, String host, int port, String name, MetricType type) { + return concat(MetaType.WORKER.getV() + type.getV(), topologyId, host, port, DEFAULT_GROUP, name); + } + + public static String workerMetricName(String name, MetricType type) { + return concat(MetaType.WORKER.getV() + type.getV(), EMPTY, EMPTY, 0, DEFAULT_GROUP, name); + } + + public static String nettyMetricName(String name, MetricType type) { + return concat(MetaType.NETTY.getV() + type.getV(), EMPTY, EMPTY, 0, JStormMetrics.NETTY_GROUP, name); + } + + public static String workerMetricPrefix(String topologyId, String host, int port) { + return concat(MetaType.WORKER.getV(), topologyId, host, port); + } + + public static String taskMetricName(String topologyId, String componentId, int taskId, String name, MetricType type) { + return concat(MetaType.TASK.getV() + type.getV(), topologyId, componentId, taskId, EMPTY, DEFAULT_GROUP, name); + } + + public static String taskMetricName(String topologyId, String componentId, int taskId, String group, String name, MetricType type) { + return concat(MetaType.TASK.getV() + type.getV(), topologyId, componentId, taskId, EMPTY, group, name); + } + + public static String compMetricName(String topologyId, String componentId, String name, MetricType type) { + return concat(MetaType.COMPONENT.getV() + type.getV(), topologyId, componentId, 0, EMPTY, DEFAULT_GROUP, name); + } + + public static String removeDelimIfPossible(String name) { + if (name.contains(DELIM)) { + return name.replace(DELIM, EMPTY); + } + return name; + } + + public static MetaType metaType(String name) { + return MetaType.parse(name.charAt(0) + EMPTY); + } + + public static MetricType metricType(String name) { + return MetricType.parse(name.charAt(1) + EMPTY); + } + + /** + * make streamId empty, remain other parts the same + */ + public static String stream2taskName(String old) { + String[] parts = old.split(DELIM); + if (parts.length >= 7) { + parts[0] = MetaType.TASK.getV() + parts[0].charAt(1); + parts[parts.length - 3] = EMPTY; + } + return concat(parts); + } + + /** + * make taskId=0 and streamId empty. + */ + public static String task2compName(String old) { + String[] parts = old.split(DELIM); + if (parts.length >= 7) { + parts[0] = MetaType.COMPONENT.getV() + parts[0].charAt(1); + parts[parts.length - 3] = EMPTY; + parts[parts.length - 4] = "0"; + } + return concat(parts); + } + + /** + * make taskId=0 and streamId empty and metricName remain the string after `.`. + */ + public static String task2MergeCompName(String old) { + String[] parts = old.split(DELIM); + if (parts.length >= 7) { + parts[0] = MetaType.COMPONENT.getV() + parts[0].charAt(1); + parts[parts.length - 3] = EMPTY; + parts[parts.length - 4] = "0"; + + String metricName = parts[parts.length - 1]; + int dotIndex = metricName.indexOf("."); + if (dotIndex != -1){ + metricName = metricName.substring(dotIndex+1); + parts[parts.length - 1] = metricName; + } + } + return concat(parts); + } + + /** + * change component metric name to worker metric name, only for topology metrics + */ + public static String comp2topologyName(String old) { + String[] parts = old.split(DELIM); + parts[0] = MetaType.TOPOLOGY.getV() + parts[0].charAt(1); + // type + topologyId + host + port + group + name + return concat(parts[0], parts[1], EMPTY, "0", parts[5], parts[6]); + } + + public static String worker2topologyName(String old) { + String[] parts = old.split(DELIM); + if (parts.length >= 5) { + parts[0] = MetaType.TOPOLOGY.getV() + parts[0].charAt(1); + parts[2] = EMPTY; // host + parts[3] = "0"; // port + } + return concat(parts); + } + + public static String topo2clusterName(String old){ + String[] parts = old.split(DELIM); + parts[1] = JStormMetrics.CLUSTER_METRIC_KEY; + return concat(parts); + } + + public static String concat(Object... args) { + StringBuilder sb = new StringBuilder(50); + int last = args.length - 1; + if (args[last] instanceof String) { + args[last] = removeDelimIfPossible((String) args[last]); + } + for (Object arg : args) { + sb.append(arg).append(DELIM); + } + sb.deleteCharAt(sb.length() - 1); + return sb.toString(); + } + + public static String concat2(Object... args) { + StringBuilder sb = new StringBuilder(50); + for (Object arg : args) { + sb.append(arg).append(DELIM); + } + sb.deleteCharAt(sb.length() - 1); + return sb.toString(); + } + + public static String concat3(String delim, Object... args) { + StringBuilder sb = new StringBuilder(50); + for (Object arg : args) { + sb.append(arg).append(delim); + } + sb.deleteCharAt(sb.length() - 1); + return sb.toString(); + } + + public static Histogram metricSnapshot2Histogram(MetricSnapshot snapshot) { + Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir()); + List<Long> points = snapshot.get_points(); + updateHistogramPoints(histogram, points); + return histogram; + } + + public static Timer metricSnapshot2Timer(MetricSnapshot snapshot) { + Timer timer = new Timer(new ExponentiallyDecayingReservoir()); + List<Long> points = snapshot.get_points(); + updateTimerPoints(timer, points); + return timer; + } + + public static void updateHistogramPoints(Histogram histogram, List<Long> points) { + if (points != null) { + for (Long pt : points) { + histogram.update(pt); + } + } + } + + public static void updateTimerPoints(Timer timer, List<Long> points) { + if (points != null) { + for (Long pt : points) { + timer.update(pt, TimeUnit.MILLISECONDS); + } + } + } + + public static Map<Integer, MetricSnapshot> toThriftCounterSnapshots(Map<Integer, AsmSnapshot> snapshots) { + Map<Integer, MetricSnapshot> ret = Maps.newHashMapWithExpectedSize(snapshots.size()); + for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) { + ret.put(entry.getKey(), convert((AsmCounterSnapshot) entry.getValue())); + } + return ret; + } + + public static Map<Integer, MetricSnapshot> toThriftGaugeSnapshots(Map<Integer, AsmSnapshot> snapshots) { + Map<Integer, MetricSnapshot> ret = Maps.newHashMapWithExpectedSize(snapshots.size()); + for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) { + ret.put(entry.getKey(), convert((AsmGaugeSnapshot) entry.getValue())); + } + return ret; + } + + public static Map<Integer, MetricSnapshot> toThriftMeterSnapshots(Map<Integer, AsmSnapshot> snapshots) { + Map<Integer, MetricSnapshot> ret = Maps.newHashMapWithExpectedSize(snapshots.size()); + for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) { + ret.put(entry.getKey(), convert((AsmMeterSnapshot) entry.getValue())); + } + return ret; + } + + public static Map<Integer, MetricSnapshot> toThriftHistoSnapshots(MetaType metaType, Map<Integer, AsmSnapshot> snapshots) { + Map<Integer, MetricSnapshot> ret = Maps.newHashMapWithExpectedSize(snapshots.size()); + for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) { + MetricSnapshot histogramSnapshot = convert(metaType, (AsmHistogramSnapshot) entry.getValue()); + if (histogramSnapshot != null) { + ret.put(entry.getKey(), histogramSnapshot); + } + } + return ret; + } + + public static Map<Integer, MetricSnapshot> toThriftTimerSnapshots(MetaType metaType, Map<Integer, AsmSnapshot> snapshots) { + Map<Integer, MetricSnapshot> ret = Maps.newHashMapWithExpectedSize(snapshots.size()); + for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) { + MetricSnapshot timerSnapshot = convert(metaType, (AsmTimerSnapshot) entry.getValue()); + if (timerSnapshot != null) { + ret.put(entry.getKey(), timerSnapshot); + } + } + return ret; + } + + public static MetricSnapshot convert(AsmCounterSnapshot snapshot) { + MetricSnapshot ret = new MetricSnapshot(); + ret.set_metricId(snapshot.getMetricId()); + ret.set_ts(TimeUtils.alignTimeToMin(snapshot.getTs())); + ret.set_metricType(MetricType.COUNTER.getT()); + ret.set_longValue(snapshot.getV()); + + return ret; + } + + public static MetricSnapshot convert(AsmGaugeSnapshot snapshot) { + MetricSnapshot ret = new MetricSnapshot(); + ret.set_metricId(snapshot.getMetricId()); + ret.set_ts(TimeUtils.alignTimeToMin(snapshot.getTs())); + ret.set_metricType(MetricType.GAUGE.getT()); + ret.set_doubleValue(snapshot.getV()); + + return ret; + } + + public static MetricSnapshot convert(AsmMeterSnapshot snapshot) { + MetricSnapshot ret = new MetricSnapshot(); + ret.set_metricId(snapshot.getMetricId()); + ret.set_ts(TimeUtils.alignTimeToMin(snapshot.getTs())); + ret.set_metricType(MetricType.METER.getT()); + + ret.set_m1(snapshot.getM1()); + ret.set_m5(snapshot.getM5()); + ret.set_m15(snapshot.getM15()); + ret.set_mean(snapshot.getMean()); + + return ret; + } + + public static MetricSnapshot convert(MetaType metaType, AsmHistogramSnapshot snapshot) { + // some histograms are never updated, skip such metrics + //if (snapshot.getSnapshot().getValues().length == 0) { + // return null; + //} + + MetricSnapshot ret = new MetricSnapshot(); + ret.set_metricId(snapshot.getMetricId()); + ret.set_ts(TimeUtils.alignTimeToMin(snapshot.getTs())); + ret.set_metricType(MetricType.HISTOGRAM.getT()); + + Snapshot ws = snapshot.getSnapshot(); + ret.set_min(ws.getMin()); + ret.set_max(ws.getMax()); + ret.set_p50(ws.getMedian()); + ret.set_p75(ws.get75thPercentile()); + ret.set_p95(ws.get95thPercentile()); + ret.set_p98(ws.get98thPercentile()); + ret.set_p99(ws.get99thPercentile()); + ret.set_p999(ws.get999thPercentile()); + ret.set_mean(ws.getMean()); + ret.set_stddev(ws.getStdDev()); + + // only upload points for component metrics + if (metaType == MetaType.COMPONENT || metaType == MetaType.TOPOLOGY) { + List<Long> pts = Lists.newArrayListWithCapacity(ws.getValues().length); + for (Long pt : ws.getValues()) { + pts.add(pt); + } + ret.set_points(pts); + } else { + ret.set_points(new ArrayList<Long>(0)); + } + + return ret; + } + + public static MetricSnapshot convert(MetaType metaType, AsmTimerSnapshot snapshot) { + // some histograms are never updated, skip such metrics + /* + if (snapshot.getHistogram().getValues().length == 0) { + return null; + } + */ + + MetricSnapshot ret = new MetricSnapshot(); + ret.set_metricId(snapshot.getMetricId()); + ret.set_ts(TimeUtils.alignTimeToMin(snapshot.getTs())); + ret.set_metricType(MetricType.TIMER.getT()); + + Snapshot ws = snapshot.getHistogram(); + ret.set_min(ws.getMin()); + ret.set_max(ws.getMax()); + ret.set_p50(ws.getMedian()); + ret.set_p75(ws.get75thPercentile()); + ret.set_p95(ws.get95thPercentile()); + ret.set_p98(ws.get98thPercentile()); + ret.set_p99(ws.get99thPercentile()); + ret.set_p999(ws.get999thPercentile()); + ret.set_mean(ws.getMean()); + ret.set_stddev(ws.getStdDev()); + + AsmMeterSnapshot ms = snapshot.getMeter(); + ret.set_m1(ms.getM1()); + ret.set_m15(ms.getM5()); + ret.set_m15(ms.getM15()); + + // only upload points for component metrics + if (metaType == MetaType.COMPONENT || metaType == MetaType.TOPOLOGY) { + List<Long> pts = Lists.newArrayListWithCapacity(ws.getValues().length); + for (Long pt : ws.getValues()) { + pts.add(pt); + } + ret.set_points(pts); + } else { + ret.set_points(new ArrayList<Long>(0)); + } + + return ret; + } + + public static String getMetricName(String fullName) { + String[] parts = fullName.split(DELIM); + return parts[parts.length - 1]; + } + + public static String str(Object obj) { + if (obj instanceof MetricSnapshot) { + MetricSnapshot snapshot = (MetricSnapshot) obj; + MetricType type = MetricType.parse(snapshot.get_metricType()); + if (type == MetricType.COUNTER) { + return counterStr(snapshot); + } else if (type == MetricType.GAUGE) { + return gaugeStr(snapshot); + } else if (type == MetricType.METER) { + return meterStr(snapshot); + } else if (type == MetricType.HISTOGRAM) { + return histogramStr(snapshot); + } else if (type == MetricType.TIMER) { + return timerStr(snapshot); + } + } + return obj.toString(); + } + + public static String counterStr(MetricSnapshot snapshot) { + StringBuilder sb = new StringBuilder(32); + sb.append("id:").append(snapshot.get_metricId()).append(",v:").append(snapshot.get_longValue()); + + return sb.toString(); + } + + public static String gaugeStr(MetricSnapshot snapshot) { + StringBuilder sb = new StringBuilder(32); + sb.append("id:").append(snapshot.get_metricId()).append(",v:").append(snapshot.get_doubleValue()); + + return sb.toString(); + } + + public static String meterStr(MetricSnapshot snapshot) { + StringBuilder sb = new StringBuilder(50); + sb.append("id:").append(snapshot.get_metricId()); + sb.append(",m1:").append(snapshot.get_m1()).append(",").append("m5:").append(snapshot.get_m5()) + .append(",").append("m15:").append(snapshot.get_m15()); + return sb.toString(); + } + + public static String histogramStr(MetricSnapshot snapshot) { + StringBuilder sb = new StringBuilder(128); + sb.append("histogram"); + sb.append("(").append("id:").append(snapshot.get_metricId()).append(",").append("min:").append(snapshot.get_min()).append(",").append("max:") + .append(snapshot.get_max()).append(",").append("mean:").append(snapshot.get_mean()).append(",").append("p50:").append(snapshot.get_p50()) + .append(",").append("p75:").append(snapshot.get_p75()).append(",").append("p95:").append(snapshot.get_p95()).append(",").append("p98:") + .append(snapshot.get_p98()).append(",").append("p99:").append(snapshot.get_p99()).append(",").append("pts:").append(snapshot.get_points_size()) + .append(")"); + return sb.toString(); + } + + public static String timerStr(MetricSnapshot snapshot) { + StringBuilder sb = new StringBuilder(128); + sb.append("timer"); + sb.append("(").append("id:").append(snapshot.get_metricId()).append(",").append("min:").append(snapshot.get_min()).append(",").append("max:") + .append(snapshot.get_max()).append(",").append("mean:").append(snapshot.get_mean()).append(",").append("p50:").append(snapshot.get_p50()) + .append(",").append("p75:").append(snapshot.get_p75()).append(",").append("p95:").append(snapshot.get_p95()).append(",").append("p98:") + .append(snapshot.get_p98()).append(",").append("p99:").append(snapshot.get_p99()).append(",").append("m1:").append(snapshot.get_m1()) + .append(",").append("m5:").append(snapshot.get_m5()).append(",").append("m15:").append(snapshot.get_m15()).append(",").append("pts:") + .append(snapshot.get_points_size()).append(")"); + return sb.toString(); + } + + public static void printMetricSnapshot(AsmMetric metric, Map<Integer, AsmSnapshot> snapshots) { + StringBuilder sb = new StringBuilder(128); + sb.append("metric:").append(metric.getMetricName()).append(", "); + for (Map.Entry<Integer, AsmSnapshot> entry : snapshots.entrySet()) { + sb.append("win:").append(entry.getKey()).append(", v:") + .append(getSnapshotDefaultValue(entry.getValue())).append("; "); + } + + LOG.info(sb.toString()); + } + + public static double getSnapshotDefaultValue(AsmSnapshot snapshot) { + if (snapshot instanceof AsmCounterSnapshot) { + return ((AsmCounterSnapshot) snapshot).getV(); + } else if (snapshot instanceof AsmGaugeSnapshot) { + return ((AsmGaugeSnapshot) snapshot).getV(); + } else if (snapshot instanceof AsmMeterSnapshot) { + return ((AsmMeterSnapshot) snapshot).getM1(); + } else if (snapshot instanceof AsmHistogramSnapshot) { + return ((AsmHistogramSnapshot) snapshot).getSnapshot().getMean(); + } else if (snapshot instanceof AsmTimerSnapshot) { + return ((AsmTimerSnapshot) snapshot).getHistogram().getMean(); + } + return 0; + } + + public static void printMetricInfo(MetricInfo metricInfo) { + iterateMap(metricInfo.get_metrics()); + } + + public static void printMetricInfo(MetricInfo metricInfo, Set<String> metrics) { + iterateMap(metricInfo.get_metrics(), metrics); + } + + public static <T> void iterateMap(Map<String, Map<Integer, T>> map) { + iterateMap(map, null); + } + + public static <T> void iterateMap(Map<String, Map<Integer, T>> map, Set<String> metrics) { + for (Map.Entry<String, Map<Integer, T>> entry : map.entrySet()) { + String name = entry.getKey(); + boolean print = false; + if (metrics == null) { + print = true; + } else { + for (String metric : metrics) { + if (name.contains(metric)) { + print = true; + break; + } + } + } + if (print) { + Map<Integer, T> winData = entry.getValue(); + for (Map.Entry<Integer, T> win : winData.entrySet()) { + T v = win.getValue(); + String str; + if (v instanceof MetricSnapshot) { + str = MetricUtils.str(v); + } else { + str = v.toString(); + } + LOG.info("metric:{}, win:{}, data:{}", name, win.getKey(), str); + } + } + } + } + + private static <T> void iter(Map<String, T> map, Func func, Object... args) { + for (Map.Entry<String, T> entry : map.entrySet()) { + func.exec(entry, args); + } + } + + + public interface Func { + void exec(Object... args); + } + + + /** + * print default value for all metrics, in the format of: name|type|value + */ + public static void logMetrics(MetricInfo metricInfo) { + Map<String, Map<Integer, MetricSnapshot>> metrics = metricInfo.get_metrics(); + if (metrics != null) { + LOG.info("\nprint metrics:"); + for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : metrics.entrySet()) { + String name = entry.getKey(); + MetricSnapshot metricSnapshot = entry.getValue().get(AsmWindow.M1_WINDOW); + if (metricSnapshot != null) { + MetricType metricType = MetricType.parse(metricSnapshot.get_metricType()); + double v; + if (metricType == MetricType.COUNTER) { + v = metricSnapshot.get_longValue(); + } else if (metricType == MetricType.GAUGE) { + v = metricSnapshot.get_doubleValue(); + } else if (metricType == MetricType.METER) { + v = metricSnapshot.get_m1(); + } else if (metricType == MetricType.HISTOGRAM) { + v = metricSnapshot.get_mean(); + } else if (metricType == MetricType.TIMER) { + v = metricSnapshot.get_mean(); + } else { + v = 0; + } + LOG.info("{}|{}|{}", metricType, v, name); + } + } + LOG.info("\n"); + } + } + + public static void main(String[] args) { + String workerName = "WC@[email protected]@6800@sys@Counter"; + System.out.println(worker2topologyName(workerName)); + } +}
