AMBARI-19732. Allow all sinks a config override to point to a different ZK quorum. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/042f4279 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/042f4279 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/042f4279 Branch: refs/heads/branch-dev-patch-upgrade Commit: 042f4279889b579bf09afd4f918e94b6598119d6 Parents: 828e24f Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Thu Jan 26 14:03:10 2017 -0800 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Thu Jan 26 14:03:10 2017 -0800 ---------------------------------------------------------------------- .../metrics2/sink/timeline/AbstractTimelineMetricsSink.java | 1 + .../metrics2/sink/kafka/KafkaTimelineMetricsReporter.java | 6 ++++-- .../metrics2/sink/storm/StormTimelineMetricsReporter.java | 9 ++++++++- .../metrics2/sink/storm/StormTimelineMetricsReporter.java | 5 ++++- .../metrics2/sink/storm/StormTimelineMetricsSink.java | 6 +++++- 5 files changed, 22 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/042f4279/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index 9bc3be5..f5a02e4 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -70,6 +70,7 @@ public abstract class AbstractTimelineMetricsSink { public static final String COLLECTOR_PROTOCOL = "protocol"; public static final String COLLECTOR_PORT = "port"; public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum"; + public static final String COLLECTOR_ZOOKEEPER_QUORUM = "metrics.zookeeper.quorum"; public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10; public static final String SKIP_COUNTER_TRANSFROMATION = "skipCounterDerivative"; public static final String RPC_METRIC_PREFIX = "metric.rpc"; http://git-wip-us.apache.org/repos/asf/ambari/blob/042f4279/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index 5892599..b9ca9f5 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -86,7 +86,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private TimelineScheduledReporter reporter; private TimelineMetricsCache metricsCache; private int timeoutSeconds = 10; - private String zookeeperQuorum; + private String zookeeperQuorum = null; private String[] excludedMetricsPrefixes; private String[] includedMetricsPrefixes; @@ -155,7 +155,9 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink int metricsSendInterval = props.getInt(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, MAX_EVICTION_TIME_MILLIS); int maxRowCacheSize = props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, MAX_RECS_PER_NAME_DEFAULT); - zookeeperQuorum = props.getString("zookeeper.connect"); + zookeeperQuorum = props.containsKey(COLLECTOR_ZOOKEEPER_QUORUM) ? + props.getString(COLLECTOR_ZOOKEEPER_QUORUM) : props.getString("zookeeper.connect"); + metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT); collectorHosts = parseHostsStringIntoCollection(props.getString(TIMELINE_HOSTS_PROPERTY, TIMELINE_DEFAULT_HOST)); metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL); http://git-wip-us.apache.org/repos/asf/ambari/blob/042f4279/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java index ef73a0e..fad9705 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java @@ -24,6 +24,8 @@ import backtype.storm.generated.TopologySummary; import backtype.storm.metric.IClusterReporter; import backtype.storm.utils.NimbusClient; import backtype.storm.utils.Utils; + +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -114,7 +116,12 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink collectorHosts = parseHostsStringIntoCollection(cf.get(COLLECTOR_HOSTS_PROPERTY).toString()); protocol = cf.get(COLLECTOR_PROTOCOL) != null ? cf.get(COLLECTOR_PROTOCOL).toString() : "http"; port = cf.get(COLLECTOR_PORT) != null ? cf.get(COLLECTOR_PORT).toString() : "6188"; - zkQuorum = cf.get(ZOOKEEPER_QUORUM) != null ? cf.get(ZOOKEEPER_QUORUM).toString() : null; + Object zkQuorumObj = cf.get(COLLECTOR_ZOOKEEPER_QUORUM); + if (zkQuorumObj != null) { + zkQuorum = zkQuorumObj.toString(); + } else { + zkQuorum = cf.get(ZOOKEEPER_QUORUM) != null ? cf.get(ZOOKEEPER_QUORUM).toString() : null; + } timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ? Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) : http://git-wip-us.apache.org/repos/asf/ambari/blob/042f4279/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java index 802e57d..e72d01f 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.metrics2.sink.storm; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.ClassUtils; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -106,7 +107,9 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink collectorHosts = parseHostsStringIntoCollection(configuration.getProperty(COLLECTOR_HOSTS_PROPERTY)); protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http"); port = configuration.getProperty(COLLECTOR_PORT, "6188"); - zkQuorum = configuration.getProperty(ZOOKEEPER_QUORUM); + + zkQuorum = StringUtils.isEmpty(conf.getProperty(COLLECTOR_ZOOKEEPER_QUORUM)) ? + conf.getProperty(ZOOKEEPER_QUORUM) : conf.getProperty(COLLECTOR_ZOOKEEPER_QUORUM); timeoutSeconds = configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS) != null ? Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS)) : http://git-wip-us.apache.org/repos/asf/ambari/blob/042f4279/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 5a3eac1..f58f549 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -18,6 +18,7 @@ package org.apache.hadoop.metrics2.sink.storm; +import org.apache.commons.lang.StringUtils; import org.apache.storm.Constants; import org.apache.storm.metric.api.IMetricsConsumer; import org.apache.storm.task.IErrorReporter; @@ -126,7 +127,10 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID); metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); collectorHosts = parseHostsStringIntoCollection(configuration.getProperty(COLLECTOR_HOSTS_PROPERTY)); - zkQuorum = configuration.getProperty("zookeeper.quorum"); + + zkQuorum = StringUtils.isEmpty(configuration.getProperty(COLLECTOR_ZOOKEEPER_QUORUM)) ? + configuration.getProperty("zookeeper.quorum") : configuration.getProperty(COLLECTOR_ZOOKEEPER_QUORUM); + protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http"); port = configuration.getProperty(COLLECTOR_PORT, "6188");