Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2203#discussion_r140973733 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java --- @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + + private static final MetricRegistry REGISTRY = new MetricRegistry(); + + private static final List<StormReporter> REPORTERS = new ArrayList<>(); + + private static String hostName = null; + + public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ + SimpleGauge<T> gauge = new SimpleGauge<>(initialValue); + String metricName = metricName(name, topologyId, componentId, port); + if(REGISTRY.getGauges().containsKey(metricName)){ + return (SimpleGauge)REGISTRY.getGauges().get(metricName); + } else { + return REGISTRY.register(metricName, gauge); + } + } + + public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ + return new DisruptorMetrics( + StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), + StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), + StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), + StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), + StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), + StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), + StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), + StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) + ); + } + + public static Meter meter(String name, WorkerTopologyContext context, String componentId){ + String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); + return REGISTRY.meter(metricName); + } + + public static void start(Map<String, Object> stormConfig, DaemonType type){ + String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); + if(localHost != null){ + hostName = localHost; + } else { + try { + hostName = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); + } + } + + LOG.info("Starting metrics reporters..."); + List<Map<String, Object>> reporterList = (List<Map<String, Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS); + if(reporterList != null && reporterList.size() > 0) { + for (Map<String, Object> reporterConfig : reporterList) { + // only start those requested + List<String> daemons = (List<String>) reporterConfig.get("daemons"); + for (String daemon : daemons) { + if (DaemonType.valueOf(daemon.toUpperCase()) == type) { + startReporter(stormConfig, reporterConfig); + } + } + } + } + } + + public static MetricRegistry registry(){ + return REGISTRY; + } + + private static void startReporter(Map<String, Object> stormConfig, Map<String, Object> reporterConfig){ + String clazz = (String)reporterConfig.get("class"); + StormReporter reporter = null; + LOG.info("Attempting to instantiate reporter class: {}", clazz); + try{ + reporter = (StormReporter)Metrics2Utils.instantiate(clazz); --- End diff -- We can replace it with `Utils.newInstance()`, and remove Metrics2Utils class.
---