Repository: ambari Updated Branches: refs/heads/trunk ba16ce33e -> faf69f4b6
AMBARI-9511. Remove hadoop-common*.jar dep form sinks. (mpapyrkovskyy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/faf69f4b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/faf69f4b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/faf69f4b Branch: refs/heads/trunk Commit: faf69f4b611e4c5737a5789ee9d268a7c7e8ef81 Parents: ba16ce3 Author: Myroslav Papirkovskyy <mpapyrkovs...@hortonworks.com> Authored: Thu Feb 19 15:55:03 2015 +0200 Committer: Myroslav Papirkovskyy <mpapyrkovs...@hortonworks.com> Committed: Tue Mar 3 18:18:53 2015 +0200 ---------------------------------------------------------------------- ambari-metrics/ambari-metrics-common/pom.xml | 16 ++- .../timeline/AbstractTimelineMetricsSink.java | 3 + .../metrics2/sink/timeline/TimelineMetric.java | 10 +- .../metrics2/sink/timeline/TimelineMetrics.java | 9 +- .../timeline/cache/TimelineMetricsCache.java | 9 +- .../timeline/configuration/Configuration.java | 6 +- .../hadoop/metrics2/sink/util/Servers.java | 111 +++++++++++++++++++ .../cache/TimelineMetricsCacheTest.java | 9 +- .../ambari-metrics-flume-sink/pom.xml | 5 + .../sink/flume/FlumeTimelineMetricsSink.java | 12 +- .../flume/FlumeTimelineMetricsSinkTest.java | 12 +- .../ambari-metrics-hadoop-sink/pom.xml | 5 + .../timeline/HadoopTimelineMetricsSink.java | 27 +++-- .../timeline/HadoopTimelineMetricsSinkTest.java | 41 +++---- .../ambari-metrics-kafka-sink/pom.xml | 12 +- .../kafka/KafkaTimelineMetricsReporter.java | 2 +- .../ambari-metrics-storm-sink/pom.xml | 5 + .../storm/StormTimelineMetricsReporter.java | 3 +- .../sink/storm/StormTimelineMetricsSink.java | 2 +- .../storm/StormTimelineMetricsSinkTest.java | 20 ++-- 20 files changed, 240 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-common/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml index 4658cfe..9cad5d4 100644 --- a/ambari-metrics/ambari-metrics-common/pom.xml +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@ -41,14 +41,24 @@ <version>1.1.1</version> </dependency> <dependency> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + <version>3.1</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + <version>1.9.13</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>2.4.0</version> + <artifactId>hadoop-annotations</artifactId> + <version>2.6.0</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> - <version>1.8.0</version> + <version>1.9.13</version> </dependency> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index 17560ac..4f5c6a1 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.net.ConnectException; import java.net.SocketAddress; +import java.io.IOException; +import java.net.SocketAddress; + import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.methods.StringRequestEntity; http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java index 68b4be8..f482e54 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java @@ -17,14 +17,16 @@ */ package org.apache.hadoop.metrics2.sink.timeline; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; +import java.util.Map; +import java.util.TreeMap; + import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; -import java.util.Map; -import java.util.TreeMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; @XmlRootElement(name = "metric") @XmlAccessorType(XmlAccessType.NONE) http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java index 4355fb1..3eb0e89 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java @@ -17,15 +17,16 @@ */ package org.apache.hadoop.metrics2.sink.timeline; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; +import java.util.ArrayList; +import java.util.List; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; -import java.util.ArrayList; -import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; /** * The class that hosts a list of timeline entities. http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java index 0f2c9a3..224b490 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java @@ -17,12 +17,10 @@ */ package org.apache.hadoop.metrics2.sink.timeline.cache; -import com.google.common.base.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.metrics2.MetricType; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import java.util.HashMap; @@ -160,7 +158,8 @@ public class TimelineMetricsCache { String metricName = timelineMetric.getMetricName(); double firstValue = timelineMetric.getMetricValues().size() > 0 ? timelineMetric.getMetricValues().entrySet().iterator().next().getValue() : 0; - double previousValue = Optional.fromNullable(counterMetricLastValue.get(metricName)).or(firstValue); + Double value = counterMetricLastValue.get(metricName); + double previousValue = value != null ? value : firstValue; Map<Long, Double> metricValues = timelineMetric.getMetricValues(); Map<Long, Double> newMetricValues = new TreeMap<Long, Double>(); for (Map.Entry<Long, Double> entry : metricValues.entrySet()) { @@ -171,8 +170,8 @@ public class TimelineMetricsCache { counterMetricLastValue.put(metricName, previousValue); } - public void putTimelineMetric(TimelineMetric timelineMetric, MetricType type) { - if (type == MetricType.COUNTER) { + public void putTimelineMetric(TimelineMetric timelineMetric, boolean isCounter) { + if (isCounter) { transformMetricValuesToDerivative(timelineMetric); } putTimelineMetric(timelineMetric); http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java index 940ea75..a0380e1 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java @@ -18,14 +18,14 @@ package org.apache.hadoop.metrics2.sink.timeline.configuration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + public class Configuration { public final Log LOG = LogFactory.getLog(this.getClass()); private final Properties properties; http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/util/Servers.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/util/Servers.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/util/Servers.java new file mode 100644 index 0000000..76da0a2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/util/Servers.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.util; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Helpers to handle server addresses + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class Servers { + /** + * This class is not intended to be instantiated + */ + private Servers() {} + + /** + * Parses a space and/or comma separated sequence of server specifications + * of the form <i>hostname</i> or <i>hostname:port</i>. If + * the specs string is null, defaults to localhost:defaultPort. + * + * @param specs server specs (see description) + * @param defaultPort the default port if not specified + * @return a list of InetSocketAddress objects. + */ + public static List<InetSocketAddress> parse(String specs, int defaultPort) { + List<InetSocketAddress> result = new ArrayList<InetSocketAddress>(); + if (specs == null) { + result.add(new InetSocketAddress("localhost", defaultPort)); + } else { + String[] specStrings = specs.split("[ ,]+"); + for (String specString : specStrings) { + result.add(createSocketAddr(specString, defaultPort)); + } + } + return result; + } + + /** + * @param host + * @param port + * @return a InetSocketAddress created with the specified host and port + */ + private static InetSocketAddress createSocketAddr(String target, int defaultPort) { + String helpText = ""; + if (target == null) { + throw new IllegalArgumentException("Target address cannot be null." + helpText); + } + boolean hasScheme = target.contains("://"); + URI uri = null; + try { + uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://" + target); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Does not contain a valid host:port authority: " + target + helpText); + } + + String host = uri.getHost(); + int port = uri.getPort(); + if (port == -1) { + port = defaultPort; + } + String path = uri.getPath(); + + if ((host == null) || (port < 0) || (!hasScheme && path != null && !path.isEmpty())) { + throw new IllegalArgumentException("Does not contain a valid host:port authority: " + target + helpText); + } + return createSocketAddrForHost(host, port); + } + + /** + * @param host + * @param port + * @return a InetSocketAddress created with the specified host and port + */ + private static InetSocketAddress createSocketAddrForHost(String host, int port) { + InetSocketAddress addr; + try { + InetAddress iaddr = InetAddress.getByName(host); + iaddr = InetAddress.getByAddress(host, iaddr.getAddress()); + addr = new InetSocketAddress(iaddr, port); + } catch (UnknownHostException e) { + addr = InetSocketAddress.createUnresolved(host, port); + } + return addr; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java index 8f07a27..4a13d63 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.metrics2.sink.timeline.cache; -import org.apache.hadoop.metrics2.MetricType; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.junit.Test; @@ -41,13 +40,13 @@ public class TimelineMetricsCacheTest { TimelineMetric metric = createTimelineMetric(new TreeMap<Long, Double>() {{ put(1L, 10.0); }}, DEFAULT_START_TIME); - timelineMetricsCache.putTimelineMetric(metric, MetricType.COUNTER); + timelineMetricsCache.putTimelineMetric(metric, true); metric = createTimelineMetric(new TreeMap<Long, Double>() {{ put(2L, 10.0); put(3L, 20.0); put(4L, 30.0); }}, DEFAULT_START_TIME + 2 * TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); - timelineMetricsCache.putTimelineMetric(metric, MetricType.COUNTER); + timelineMetricsCache.putTimelineMetric(metric, true); TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME); assertEquals(0, cachedMetric.getMetricValues().get(1L), delta); @@ -60,12 +59,12 @@ public class TimelineMetricsCacheTest { put(6L, 120.0); put(7L, 230.0); }}, DEFAULT_START_TIME + 3 * TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); - timelineMetricsCache.putTimelineMetric(metric, MetricType.COUNTER); + timelineMetricsCache.putTimelineMetric(metric, true); metric = createTimelineMetric(new TreeMap<Long, Double>() {{ put(8L, 300.0); }}, DEFAULT_START_TIME + 5 * TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); - timelineMetricsCache.putTimelineMetric(metric, MetricType.COUNTER); + timelineMetricsCache.putTimelineMetric(metric, true); cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME); assertEquals(70, cachedMetric.getMetricValues().get(5L), delta); assertEquals(20, cachedMetric.getMetricValues().get(6L), delta); http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-flume-sink/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/pom.xml b/ambari-metrics/ambari-metrics-flume-sink/pom.xml index f11b8b2..a83d7b0 100644 --- a/ambari-metrics/ambari-metrics-flume-sink/pom.xml +++ b/ambari-metrics/ambari-metrics-flume-sink/pom.xml @@ -108,6 +108,11 @@ limitations under the License. <version>${project.version}</version> </dependency> <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java index 0231b24..9e66c99 100644 --- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java @@ -24,14 +24,13 @@ import org.apache.flume.Context; import org.apache.flume.FlumeException; import org.apache.flume.instrumentation.MonitorService; import org.apache.flume.instrumentation.util.JMXPollUtil; -import org.apache.hadoop.metrics2.MetricType; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration; -import org.apache.hadoop.metrics2.util.Servers; +import org.apache.hadoop.metrics2.sink.util.Servers; import java.io.IOException; import java.net.InetAddress; @@ -48,6 +47,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; + + public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService { private SocketAddress socketAddress; private String collectorUri; @@ -158,7 +159,7 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem TimelineMetric timelineMetric = createTimelineMetric(currentTimeMillis, component, attributeName, attributeValue); // Put intermediate values into the cache until it is time to send - metricsCache.putTimelineMetric(timelineMetric, getMetricType(attributeName)); + metricsCache.putTimelineMetric(timelineMetric, isCounterMetric(attributeName)); TimelineMetric cachedMetric = metricsCache.getTimelineMetric(attributeName); @@ -189,8 +190,7 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem } } - private MetricType getMetricType(String attributeName) { - return counterMetrics.contains(attributeName) ? - MetricType.COUNTER : MetricType.GAUGE; + private boolean isCounterMetric(String attributeName) { + return counterMetrics.contains(attributeName); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java index 90831bf..647e026 100644 --- a/ambari-metrics/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java @@ -18,6 +18,12 @@ package org.apache.hadoop.metrics2.sink.flume; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replay; +import static org.powermock.api.easymock.PowerMock.verifyAll; + +import java.util.Collections; + import org.apache.commons.httpclient.HttpClient; import org.apache.flume.instrumentation.util.JMXPollUtil; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -27,10 +33,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.util.Collections; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replay; -import static org.powermock.api.easymock.PowerMock.verifyAll; @RunWith(PowerMockRunner.class) @PrepareForTest(JMXPollUtil.class) @@ -102,4 +104,4 @@ public class FlumeTimelineMetricsSinkTest { collector.run(); verifyAll(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml index 848a8f2..ee3bcd8 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml +++ b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml @@ -158,6 +158,11 @@ limitations under the License. <scope>compile</scope> </dependency> <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java index 25058b3..06f6011 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -17,21 +17,33 @@ */ package org.apache.hadoop.metrics2.sink.timeline; +import java.io.IOException; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.commons.configuration.SubsetConfiguration; import org.apache.commons.lang.ClassUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.metrics2.*; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.MetricType; import org.apache.hadoop.metrics2.impl.MsInfo; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import org.apache.hadoop.metrics2.util.Servers; import org.apache.hadoop.net.DNS; -import java.io.IOException; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.util.*; - @InterfaceAudience.Public @InterfaceStability.Evolving public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink { @@ -164,7 +176,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple timelineMetric.setType(ClassUtils.getShortCanonicalName(value, "Number")); timelineMetric.getMetricValues().put(startTime, value.doubleValue()); // Put intermediate values into the cache until it is time to send - metricsCache.putTimelineMetric(timelineMetric, metric.type()); + boolean isCounter = MetricType.COUNTER == metric.type(); + metricsCache.putTimelineMetric(timelineMetric, isCounter); // Retrieve all values from cache if it is time to send TimelineMetric cachedMetric = metricsCache.getTimelineMetric(name); http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java index d7b5d73..dddbbd0 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java @@ -18,43 +18,34 @@ package org.apache.hadoop.metrics2.sink.timeline; -import org.apache.commons.configuration.SubsetConfiguration; -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.methods.PostMethod; -import org.apache.hadoop.metrics2.AbstractMetric; -import org.apache.hadoop.metrics2.MetricsRecord; -import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.IArgumentMatcher; -import org.junit.Assert; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_HOST_PROPERTY; import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE; import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL; import static org.easymock.EasyMock.anyInt; import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createMockBuilder; import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reportMatcher; import static org.easymock.EasyMock.verify; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Assert; +import org.junit.Test; + public class HadoopTimelineMetricsSinkTest { @Test @@ -249,4 +240,4 @@ public class HadoopTimelineMetricsSinkTest { Assert.assertEquals(new Double(5.0), values.next()); Assert.assertEquals(new Double(6.0), values.next()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml index e385935..2aefe1d 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml +++ b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml @@ -42,7 +42,7 @@ limitations under the License. <goal>copy-dependencies</goal> </goals> <configuration> - <includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,guava,jackson-core-asl,jackson-mapper-asl,jackson-xc,hadoop-common</includeArtifactIds> + <includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,guava,jackson-core-asl,jackson-mapper-asl,jackson-xc</includeArtifactIds> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> @@ -130,6 +130,16 @@ limitations under the License. <version>2.2.0</version> </dependency> <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.6</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index 63097e5..1f44494 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -40,7 +40,7 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; -import org.apache.hadoop.metrics2.util.Servers; +import org.apache.hadoop.metrics2.sink.util.Servers; import com.yammer.metrics.Metrics; import com.yammer.metrics.core.Counter; http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-storm-sink/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/pom.xml b/ambari-metrics/ambari-metrics-storm-sink/pom.xml index d069622..70bdec5 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/pom.xml +++ b/ambari-metrics/ambari-metrics-storm-sink/pom.xml @@ -113,6 +113,11 @@ limitations under the License. <version>${project.version}</version> </dependency> <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java index c969299..2d4baa3 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java @@ -29,8 +29,7 @@ import org.apache.commons.lang.Validate; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.util.Servers; -import org.codehaus.jackson.map.AnnotationIntrospector; +import org.apache.hadoop.metrics2.sink.util.Servers; import java.net.InetAddress; import java.net.InetSocketAddress; http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index ffcc6ed..dd0e72f 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -30,7 +30,7 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration; -import org.apache.hadoop.metrics2.util.Servers; +import org.apache.hadoop.metrics2.sink.util.Servers; import java.io.IOException; import java.net.InetAddress; http://git-wip-us.apache.org/repos/asf/ambari/blob/faf69f4b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index 95a9329..15021e5 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -18,18 +18,24 @@ package org.apache.hadoop.metrics2.sink.storm; -import backtype.storm.metric.api.IMetricsConsumer; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.Collections; + import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import org.junit.Test; -import java.io.IOException; -import java.net.SocketAddress; -import java.util.Collections; - -import static org.easymock.EasyMock.*; +import backtype.storm.metric.api.IMetricsConsumer; public class StormTimelineMetricsSinkTest { @Test @@ -65,4 +71,4 @@ public class StormTimelineMetricsSinkTest { Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42))); verify(timelineMetricsCache, httpClient); } -} \ No newline at end of file +}