Adding Cvs and Console statistics reporter plugins Make statistics reporter plugins a list.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ea6cccfc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ea6cccfc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ea6cccfc Branch: refs/heads/1.x-branch Commit: ea6cccfcab257fe6854867c2b54029378953080a Parents: 6868233 Author: Kishor Patil <[email protected]> Authored: Wed Feb 3 13:47:09 2016 -0600 Committer: Kishor Patil <[email protected]> Committed: Fri Feb 5 19:28:32 2016 +0000 ---------------------------------------------------------------------- conf/defaults.yaml | 4 + .../src/clj/org/apache/storm/daemon/common.clj | 9 ++- storm-core/src/jvm/org/apache/storm/Config.java | 2 +- .../storm/statistics/StatisticsUtils.java | 27 +++++-- .../reporters/ConsolePreparableReporter.java | 65 ++++++++++++++++ .../reporters/CsvPreparableReporter.java | 80 ++++++++++++++++++++ .../reporters/JMXPreparableReporter.java | 49 ------------ .../reporters/JmxPreparableReporter.java | 56 ++++++++++++++ 8 files changed, 234 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 8873d12..b468290 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -281,3 +281,7 @@ pacemaker.thread.timeout: 10 pacemaker.childopts: "-Xmx1024m" pacemaker.auth.method: "NONE" pacemaker.kerberos.users: [] + +#default plugin for daemon statistics reporter +storm.statistics.preparable.reporter.plugin: + - "org.apache.storm.statistics.reporters.JmxPreparableReporter" http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj index 44a1d43..6b7d539 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@ -33,12 +33,17 @@ (:require [org.apache.storm.thrift :as thrift]) (:require [metrics.core :refer [default-registry]])) -(defn start-metrics-reporters [conf] - (doto (StatisticsUtils/getPreparableReporter conf) +(defn start-metrics-reporter [reporter conf] + (doto reporter (.prepare default-registry conf) (.start)) (log-message "Started statistics report plugin...")) +(defn start-metrics-reporters [conf] + (doseq [reporter (StatisticsUtils/getPreparableReporters conf)] + (start-metrics-reporter reporter conf))) + + (def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID) (def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID) (def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID) http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index bf50223..9d18667 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -143,7 +143,7 @@ public class Config extends HashMap<String, Object> { * A list of statistics preparable reporter class names. */ @NotNull - @isImplementationOfClass(implementsClass = org.apache.storm.statistics.reporters.PreparableReporter.class) + @isStringList public static final String STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN = "storm.statistics.preparable.reporter.plugin"; /** http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java index 19f7690..666e44d 100644 --- a/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java +++ b/storm-core/src/jvm/org/apache/storm/statistics/StatisticsUtils.java @@ -1,25 +1,40 @@ package org.apache.storm.statistics; import org.apache.storm.Config; -import org.apache.storm.statistics.reporters.JMXPreparableReporter; +import org.apache.storm.statistics.reporters.JmxPreparableReporter; import org.apache.storm.statistics.reporters.PreparableReporter; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Map; public class StatisticsUtils { private final static Logger LOG = LoggerFactory.getLogger(StatisticsUtils.class); - public static PreparableReporter getPreparableReporter(Map stormConf) { - PreparableReporter reporter = new JMXPreparableReporter(); - String clazz = (String) stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN); + public static List<PreparableReporter> getPreparableReporters(Map stormConf) { + PreparableReporter reporter = new JmxPreparableReporter(); + List<String> clazzes = (List<String>) stormConf.get(Config.STORM_STATISTICS_PREPARABLE_REPORTER_PLUGIN); + List<PreparableReporter> reporterList = new ArrayList<>(); + + if (clazzes != null) { + for(String clazz: clazzes ) { + reporterList.add(getPreparableReporter(clazz)); + } + } + if(reporterList.isEmpty()) { + reporterList.add(new JmxPreparableReporter()); + } + return reporterList; + } + + private static PreparableReporter getPreparableReporter(String clazz) { + PreparableReporter reporter = null; LOG.info("Using statistics reporter plugin:" + clazz); if(clazz != null) { reporter = (PreparableReporter) Utils.newInstance(clazz); - } else { - reporter = new JMXPreparableReporter(); } return reporter; } http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java new file mode 100644 index 0000000..f545b5b --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/ConsolePreparableReporter.java @@ -0,0 +1,65 @@ +package org.apache.storm.statistics.reporters; + +import com.codahale.metrics.ConsoleReporter; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintStream; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class ConsolePreparableReporter implements PreparableReporter<ConsoleReporter> { + private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class); + ConsoleReporter reporter = null; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf) { + LOG.info("Preparing..."); + ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry); + PrintStream stream = (PrintStream)stormConf.get(":stream"); + if (stream != null) { + builder.outputTo(stream); + } + Locale locale = (Locale)stormConf.get(":locale"); + if (locale != null) { + builder.formattedFor(locale); + } + String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null); + if (rateUnit != null) { + builder.convertRatesTo(TimeUnit.valueOf(rateUnit)); + } + String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null); + if (durationUnit != null) { + builder.convertDurationsTo(TimeUnit.valueOf(durationUnit)); + } + MetricFilter filter = (MetricFilter) stormConf.get(":filter"); + if (filter != null) { + builder.filter(filter); + } + reporter = builder.build(); + } + + @Override + public void start() { + if (reporter != null ) { + LOG.info("Starting..."); + reporter.start(10, TimeUnit.SECONDS); + } else { + throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (reporter !=null) { + LOG.info("Stopping..."); + reporter.stop(); + } else { + throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java new file mode 100644 index 0000000..610df33 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/CsvPreparableReporter.java @@ -0,0 +1,80 @@ +package org.apache.storm.statistics.reporters; + +import com.codahale.metrics.CsvReporter; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class CsvPreparableReporter implements PreparableReporter<CsvReporter> { + private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class); + CsvReporter reporter = null; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf) { + LOG.info("Preparing..."); + CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry); + + Locale locale = (Locale) stormConf.get(":locale"); + if (locale != null) { + builder.formatFor(locale); + } + String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null); + if (rateUnit != null) { + builder.convertRatesTo(TimeUnit.valueOf(rateUnit)); + } + String durationUnit = Utils.getString(stormConf.get(":duration-unit"), null); + if (durationUnit != null) { + builder.convertDurationsTo(TimeUnit.valueOf(durationUnit)); + } + MetricFilter filter = (MetricFilter) stormConf.get(":filter"); + if (filter != null) { + builder.filter(filter); + } + String localStormDirLocation = Utils.getString(stormConf.get(Config.STORM_LOCAL_DIR), "."); + File logDir = new File(localStormDirLocation + "csvmetrics" ); + validateCreateOutputDir(logDir); + reporter = builder.build(logDir); + } + + @Override + public void start() { + if (reporter != null) { + LOG.info("Starting..."); + reporter.start(10, TimeUnit.SECONDS); + } else { + throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (reporter != null) { + LOG.info("Stopping..."); + reporter.stop(); + } else { + throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); + } + } + + + private void validateCreateOutputDir(File dir) { + if (!dir.exists()) { + dir.mkdirs(); + } + if (!dir.canWrite()) { + throw new IllegalStateException(dir.getName() + " does not have write permissions."); + } + if (!dir.isDirectory()) { + throw new IllegalStateException(dir.getName() + " is not a directory."); + } + } +} + http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java deleted file mode 100644 index 5d94ffc..0000000 --- a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JMXPreparableReporter.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.apache.storm.statistics.reporters; - -import com.codahale.metrics.JmxReporter; -import com.codahale.metrics.MetricFilter; -import com.codahale.metrics.MetricRegistry; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.TimeUnit; - -public class JMXPreparableReporter implements PreparableReporter<JmxReporter> { - private final static Logger LOG = LoggerFactory.getLogger(JMXPreparableReporter.class); - - JmxReporter reporter = null; - - @Override - public void prepare(MetricRegistry metricsRegistry, Map stormConf) { - LOG.info("Preparing..."); - JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry); - String domain = Utils.getString(stormConf.get(":domain"), null); - if (domain != null) { - builder.inDomain(domain); - } - String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null); - if (rateUnit != null) { - builder.convertRatesTo(TimeUnit.valueOf(rateUnit)); - } - MetricFilter filter = (MetricFilter) stormConf.get(":filter"); - if (filter != null) { - builder.filter(filter); - } - reporter = builder.build(); - - } - - @Override - public void start() { - LOG.info("Starting..."); - reporter.start(); - } - - @Override - public void stop() { - LOG.info("Stopping..."); - reporter.stop(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/ea6cccfc/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java new file mode 100644 index 0000000..ba59611 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/statistics/reporters/JmxPreparableReporter.java @@ -0,0 +1,56 @@ +package org.apache.storm.statistics.reporters; + +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class JmxPreparableReporter implements PreparableReporter<JmxReporter> { + private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class); + JmxReporter reporter = null; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf) { + LOG.info("Preparing..."); + JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry); + String domain = Utils.getString(stormConf.get(":domain"), null); + if (domain != null) { + builder.inDomain(domain); + } + String rateUnit = Utils.getString(stormConf.get(":rate-unit"), null); + if (rateUnit != null) { + builder.convertRatesTo(TimeUnit.valueOf(rateUnit)); + } + MetricFilter filter = (MetricFilter) stormConf.get(":filter"); + if (filter != null) { + builder.filter(filter); + } + reporter = builder.build(); + + } + + @Override + public void start() { + if (reporter != null ) { + LOG.info("Starting..."); + reporter.start(); + } else { + throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (reporter !=null) { + LOG.info("Stopping..."); + reporter.stop(); + } else { + throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); + } + } +}
