AMBARI-8522. Enable Flume metrics sink to AMS. (Szilard Nemethy via mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b42150ca Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b42150ca Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b42150ca Branch: refs/heads/trunk Commit: b42150cad1fb21d777519e029154614072a5b219 Parents: 9cdd4e2 Author: Myroslav Papirkovskyy <mpapyrkovs...@hortonworks.com> Authored: Fri Dec 5 21:34:58 2014 +0200 Committer: Myroslav Papirkovskyy <mpapyrkovs...@hortonworks.com> Committed: Fri Dec 5 21:42:06 2014 +0200 ---------------------------------------------------------------------- ambari-metrics/ambari-metrics-common/pom.xml | 81 +++++++ .../metrics2/sink/timeline/TimelineMetric.java | 172 +++++++++++++++ .../metrics2/sink/timeline/TimelineMetrics.java | 101 +++++++++ .../base/AbstractTimelineMetricsSink.java | 79 +++++++ .../timeline/cache/TimelineMetricsCache.java | 128 +++++++++++ .../timeline/configuration/Configuration.java | 62 ++++++ .../ambari-metrics-flume-sink/pom.xml | 181 ++++++++++++++++ .../src/main/assemblies/empty.xml | 21 ++ .../src/main/assemblies/sink.xml | 34 +++ .../src/main/conf/flume-metrics2.properties.j2 | 22 ++ .../sink/flume/FlumeTimelineMetricsSink.java | 176 ++++++++++++++++ .../flume/FlumeTimelineMetricsSinkTest.java | 117 ++++++++++ .../ambari-metrics-hadoop-sink/pom.xml | 12 +- .../conf/hadoop-metrics2-hbase.properties.j2 | 8 +- .../src/main/conf/hadoop-metrics2.properties.j2 | 2 +- .../timeline/AbstractTimelineMetricsSink.java | 101 --------- .../timeline/HadoopTimelineMetricsSink.java | 211 +++++++++++++++++++ .../metrics2/sink/timeline/TimelineMetric.java | 172 --------------- .../metrics2/sink/timeline/TimelineMetrics.java | 102 --------- .../sink/timeline/TimelineMetricsCache.java | 128 ----------- .../sink/timeline/TimelineMetricsSink.java | 211 ------------------- .../ambari-metrics-timelineservice/pom.xml | 2 +- ambari-metrics/pom.xml | 2 + 23 files changed, 1399 insertions(+), 726 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml new file mode 100644 index 0000000..786ad93 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@ -0,0 +1,81 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>ambari-metrics</artifactId> + <groupId>org.apache.ambari</groupId> + <version>0.1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>ambari-metrics-common</artifactId> + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.0</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.8</version> + <executions> + <execution> + <id>parse-version</id> + <phase>validate</phase> + <goals> + <goal>parse-version</goal> + </goals> + </execution> + <execution> + <id>regex-property</id> + <goals> + <goal>regex-property</goal> + </goals> + <configuration> + <name>ambariVersion</name> + <value>${project.version}</value> + <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex> + <replacement>$1.$2.$3</replacement> + <failIfNoMatch>false</failIfNoMatch> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + <version>1.1.1</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>2.4.0</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.8.0</version> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java new file mode 100644 index 0000000..68b4be8 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Map; +import java.util.TreeMap; + +@XmlRootElement(name = "metric") +@XmlAccessorType(XmlAccessType.NONE) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineMetric implements Comparable<TimelineMetric> { + + private String metricName; + private String appId; + private String instanceId; + private String hostName; + private long timestamp; + private long startTime; + private String type; + private Map<Long, Double> metricValues = new TreeMap<Long, Double>(); + + @XmlElement(name = "metricname") + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + @XmlElement(name = "appid") + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + @XmlElement(name = "instanceid") + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + @XmlElement(name = "hostname") + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + @XmlElement(name = "timestamp") + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @XmlElement(name = "starttime") + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + @XmlElement(name = "type") + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @XmlElement(name = "metrics") + public Map<Long, Double> getMetricValues() { + return metricValues; + } + + public void setMetricValues(Map<Long, Double> metricValues) { + this.metricValues = metricValues; + } + + public void addMetricValues(Map<Long, Double> metricValues) { + this.metricValues.putAll(metricValues); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineMetric metric = (TimelineMetric) o; + + if (!metricName.equals(metric.metricName)) return false; + if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null) + return false; + if (appId != null ? !appId.equals(metric.appId) : metric.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + if (timestamp != metric.timestamp) return false; + if (startTime != metric.startTime) return false; + + return true; + } + + public boolean equalsExceptTime(TimelineMetric metric) { + if (!metricName.equals(metric.metricName)) return false; + if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null) + return false; + if (appId != null ? !appId.equals(metric.appId) : metric.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); + result = 31 * result + (hostName != null ? hostName.hashCode() : 0); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public int compareTo(TimelineMetric other) { + if (timestamp > other.timestamp) { + return -1; + } else if (timestamp < other.timestamp) { + return 1; + } else { + return metricName.compareTo(other.metricName); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java new file mode 100644 index 0000000..4355fb1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/** + * The class that hosts a list of timeline entities. + */ +@XmlRootElement(name = "metrics") +@XmlAccessorType(XmlAccessType.NONE) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineMetrics { + + private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>(); + + public TimelineMetrics() {} + + @XmlElement(name = "metrics") + public List<TimelineMetric> getMetrics() { + return allMetrics; + } + + public void setMetrics(List<TimelineMetric> allMetrics) { + this.allMetrics = allMetrics; + } + + private boolean isEqualTimelineMetrics(TimelineMetric metric1, + TimelineMetric metric2) { + + boolean isEqual = true; + + if (!metric1.getMetricName().equals(metric2.getMetricName())) { + return false; + } + + if (metric1.getHostName() != null) { + isEqual = metric1.getHostName().equals(metric2.getHostName()); + } + + if (metric1.getAppId() != null) { + isEqual = metric1.getAppId().equals(metric2.getAppId()); + } + + return isEqual; + } + + /** + * Merge with existing TimelineMetric if everything except startTime is + * the same. + * @param metric {@link TimelineMetric} + */ + public void addOrMergeTimelineMetric(TimelineMetric metric) { + TimelineMetric metricToMerge = null; + + if (!allMetrics.isEmpty()) { + for (TimelineMetric timelineMetric : allMetrics) { + if (timelineMetric.equalsExceptTime(metric)) { + metricToMerge = timelineMetric; + break; + } + } + } + + if (metricToMerge != null) { + metricToMerge.addMetricValues(metric.getMetricValues()); + if (metricToMerge.getTimestamp() > metric.getTimestamp()) { + metricToMerge.setTimestamp(metric.getTimestamp()); + } + if (metricToMerge.getStartTime() > metric.getStartTime()) { + metricToMerge.setStartTime(metric.getStartTime()); + } + } else { + allMetrics.add(metric); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java new file mode 100644 index 0000000..d51ee67 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/base/AbstractTimelineMetricsSink.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.base; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.StringRequestEntity; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +import java.io.IOException; +import java.net.SocketAddress; + +public abstract class AbstractTimelineMetricsSink { + public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix."; + public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize"; + public static final String METRICS_SEND_INTERVAL = "sendInterval"; + public static final String COLLECTOR_HOST_PROPERTY = "collector"; + + protected final Log LOG = LogFactory.getLog(this.getClass()); + private HttpClient httpClient = new HttpClient(); + + protected static ObjectMapper mapper; + + static { + mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); + mapper.setAnnotationIntrospector(introspector); + mapper.getSerializationConfig() + .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + } + + protected void emitMetrics(TimelineMetrics metrics) throws IOException { + String jsonData = mapper.writeValueAsString(metrics); + + SocketAddress socketAddress = getServerSocketAddress(); + + if (socketAddress != null) { + StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8"); + + PostMethod postMethod = new PostMethod(getCollectorUri()); + postMethod.setRequestEntity(requestEntity); + int statusCode = httpClient.executeMethod(postMethod); + if (statusCode != 200) { + LOG.info("Unable to POST metrics to collector, " + getCollectorUri()); + } else { + LOG.debug("Metrics posted to Collector " + getCollectorUri()); + } + } + } + + public void setHttpClient(HttpClient httpClient) { + this.httpClient = httpClient; + } + + abstract protected SocketAddress getServerSocketAddress(); + + abstract protected String getCollectorUri(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java new file mode 100644 index 0000000..06c3441 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.cache; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; + +import java.util.LinkedHashMap; +import java.util.Map; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class TimelineMetricsCache { + + private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder(); + private static final Log LOG = LogFactory.getLog(TimelineMetric.class); + public static final int MAX_RECS_PER_NAME_DEFAULT = 10000; + public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min + private final int maxRecsPerName; + private final int maxEvictionTimeInMillis; + + public TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) { + this.maxRecsPerName = maxRecsPerName; + this.maxEvictionTimeInMillis = maxEvictionTimeInMillis; + } + + class TimelineMetricWrapper { + private long timeDiff = -1; + private long oldestTimestamp = -1; + private TimelineMetric timelineMetric; + + TimelineMetricWrapper(TimelineMetric timelineMetric) { + this.timelineMetric = timelineMetric; + this.oldestTimestamp = timelineMetric.getStartTime(); + } + + private void updateTimeDiff(long timestamp) { + if (oldestTimestamp != -1 && timestamp > oldestTimestamp) { + timeDiff = timestamp - oldestTimestamp; + } else { + oldestTimestamp = timestamp; + } + } + + public void putMetric(TimelineMetric metric) { + this.timelineMetric.addMetricValues(metric.getMetricValues()); + updateTimeDiff(metric.getStartTime()); + } + + public long getTimeDiff() { + return timeDiff; + } + + public TimelineMetric getTimelineMetric() { + return timelineMetric; + } + } + + // TODO: Change to ConcurentHashMap with weighted eviction + class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> { + private static final long serialVersionUID = 1L; + private boolean gotOverflow = false; + + @Override + protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) { + boolean overflow = size() > maxRecsPerName; + if (overflow && !gotOverflow) { + LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest); + gotOverflow = true; + } + return overflow; + } + + public TimelineMetric evict(String metricName) { + TimelineMetricWrapper metricWrapper = this.get(metricName); + + if (metricWrapper == null + || metricWrapper.getTimeDiff() < maxEvictionTimeInMillis) { + return null; + } + + TimelineMetric timelineMetric = metricWrapper.getTimelineMetric(); + this.remove(metricName); + + return timelineMetric; + } + + public void put(String metricName, TimelineMetric timelineMetric) { + + TimelineMetricWrapper metric = this.get(metricName); + if (metric == null) { + this.put(metricName, new TimelineMetricWrapper(timelineMetric)); + } else { + metric.putMetric(timelineMetric); + } + } + } + + public TimelineMetric getTimelineMetric(String metricName) { + if (timelineMetricCache.containsKey(metricName)) { + return timelineMetricCache.evict(metricName); + } + + return null; + } + + public void putTimelineMetric(TimelineMetric timelineMetric) { + timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java new file mode 100644 index 0000000..940ea75 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/configuration/Configuration.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.timeline.configuration; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class Configuration { + public final Log LOG = LogFactory.getLog(this.getClass()); + private final Properties properties; + + public Configuration(String configFile) { + properties = new Properties(); + + //Get property file stream from classpath + InputStream inputStream = Configuration.class.getResourceAsStream(configFile); + + if (inputStream == null) { + throw new IllegalArgumentException(configFile + " not found in classpath"); + } + + // load the properties + try { + properties.load(inputStream); + inputStream.close(); + } catch (FileNotFoundException fnf) { + LOG.info("No configuration file " + configFile + " found in classpath.", fnf); + } catch (IOException ie) { + throw new IllegalArgumentException("Can't read configuration file " + + configFile, ie); + } + } + + public String getProperty(String key) { + return properties.getProperty(key); + } + + public String getProperty(String key, String defaultValue) { + return properties.getProperty(key, defaultValue); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/pom.xml b/ambari-metrics/ambari-metrics-flume-sink/pom.xml new file mode 100644 index 0000000..3021438 --- /dev/null +++ b/ambari-metrics/ambari-metrics-flume-sink/pom.xml @@ -0,0 +1,181 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>ambari-metrics</artifactId> + <groupId>org.apache.ambari</groupId> + <version>0.1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>ambari-metrics-flume-sink</artifactId> + <version>0.1.0-SNAPSHOT</version> + <packaging>jar</packaging> + <build> + <plugins> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptors> + <descriptor>src/main/assemblies/sink.xml</descriptor> + </descriptors> + <tarLongFileMode>gnu</tarLongFileMode> + </configuration> + <executions> + <execution> + <id>build-tarball</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.0</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.8</version> + <executions> + <execution> + <id>parse-version</id> + <phase>validate</phase> + <goals> + <goal>parse-version</goal> + </goals> + </execution> + <execution> + <id>regex-property</id> + <goals> + <goal>regex-property</goal> + </goals> + <configuration> + <name>ambariVersion</name> + <value>${project.version}</value> + <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex> + <replacement>$1.$2.$3</replacement> + <failIfNoMatch>false</failIfNoMatch> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>com.github.goldin</groupId> + <artifactId>copy-maven-plugin</artifactId> + <version>0.2.5</version> + <executions> + <execution> + <id>create-archive</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>rpm-maven-plugin</artifactId> + <version>2.0.1</version> + <executions> + <execution> + <phase>none</phase> + <goals> + <goal>rpm</goal> + </goals> + </execution> + </executions> + <configuration> + <name>ambari-metrics-flume-sink</name> + <copyright>2012, Apache Software Foundation</copyright> + <group>Development</group> + <description>Maven Recipe: RPM Package.</description> + <mappings> + <mapping> + <directory>/usr/lib/flume/lib</directory> + <filemode>644</filemode> + <username>root</username> + <groupname>root</groupname> + <sources> + <source> + <location>target/${project.artifactId}-${project.version}.jar</location> + </source> + <source> + <location>target/lib</location> + </source> + </sources> + </mapping> + </mappings> + </configuration> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>1.5.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-common</artifactId> + <version>0.1.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + <version>4.10</version> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <version>3.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-easymock</artifactId> + <version>1.4.9</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>1.4.9</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml new file mode 100644 index 0000000..35738b1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/empty.xml @@ -0,0 +1,21 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly> + <id>empty</id> + <formats/> +</assembly> http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml new file mode 100644 index 0000000..21a6b36 --- /dev/null +++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/assemblies/sink.xml @@ -0,0 +1,34 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly> + <!--This 'all' id is not appended to the produced bundle because we do this: + http://maven.apache.org/plugins/maven-assembly-plugin/faq.html#required-classifiers + --> + <id>dist</id> + <formats> + <format>dir</format> + <format>tar.gz</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <files> + <file> + <source>${project.build.directory}/${artifact.artifactId}-${artifact.version}.jar</source> + <outputDirectory>ambari-metrics-${project.version}/lib/ambari-metrics</outputDirectory> + </file> + </files> +</assembly> http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2 b/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2 new file mode 100644 index 0000000..7458bf8 --- /dev/null +++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2 @@ -0,0 +1,22 @@ +{# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +collector={{metric_collector_host}}:8188 +collectionFrequency=60000 +maxRowCacheSize=10000 +sendInterval=59000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java new file mode 100644 index 0000000..87c4ab8 --- /dev/null +++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.flume; + +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.lang.math.NumberUtils; +import org.apache.flume.Context; +import org.apache.flume.FlumeException; +import org.apache.flume.instrumentation.MonitorService; +import org.apache.flume.instrumentation.util.JMXPollUtil; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink; +import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; +import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration; +import org.apache.hadoop.metrics2.util.Servers; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService { + private SocketAddress socketAddress; + private String collectorUri; + private TimelineMetricsCache metricsCache; + private ScheduledExecutorService scheduledExecutorService; + private long pollFrequency; + private String hostname; + + @Override + public void start() { + LOG.info("Starting Flume Metrics Sink"); + TimelineMetricsCollector timelineMetricsCollector = new TimelineMetricsCollector(); + if (scheduledExecutorService == null || scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + } + scheduledExecutorService.scheduleWithFixedDelay(timelineMetricsCollector, 0, + pollFrequency, TimeUnit.MILLISECONDS); + } + + @Override + public void stop() { + LOG.info("Stopping Flume Metrics Sink"); + scheduledExecutorService.shutdown(); + } + + @Override + public void configure(Context context) { + LOG.info("Context parameters " + context); + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Could not identify hostname."); + throw new FlumeException("Could not identify hostname.", e); + } + Configuration configuration = new Configuration("/flume-metrics2.properties"); + int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE, + String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT))); + int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL, + String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS))); + metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); + collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + "/ws/v1/timeline/metrics"; + List<InetSocketAddress> socketAddresses = + Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), 8188); + if (socketAddresses != null && !socketAddresses.isEmpty()) { + socketAddress = socketAddresses.get(0); + } + pollFrequency = Long.parseLong(configuration.getProperty("collectionFrequency")); + } + + @Override + public SocketAddress getServerSocketAddress() { + return socketAddress; + } + + @Override + public String getCollectorUri() { + return collectorUri; + } + + public void setPollFrequency(long pollFrequency) { + this.pollFrequency = pollFrequency; + } + + public void setMetricsCache(TimelineMetricsCache metricsCache) { + this.metricsCache = metricsCache; + } + + /** + * Worker which polls JMX for all mbeans with + * {@link javax.management.ObjectName} within the flume namespace: + * org.apache.flume. All attributes of such beans are sent + * to the metrics collector service. + */ + private class TimelineMetricsCollector implements Runnable { + @Override + public void run() { + LOG.debug("Collecting Metrics for Flume"); + try { + Map<String, Map<String, String>> metricsMap = + JMXPollUtil.getAllMBeans(); + long currentTimeMillis = System.currentTimeMillis(); + for (String component : metricsMap.keySet()) { + Map<String, String> attributeMap = metricsMap.get(component); + LOG.info("Attributes for component " + component); + processComponentAttributes(currentTimeMillis, component, attributeMap); + } + } catch (Exception e) { + LOG.error("Unexpected error", e); + } + LOG.debug("Finished collecting Metrics for Flume"); + } + + private void processComponentAttributes(long currentTimeMillis, String component, Map<String, String> attributeMap) throws IOException { + List<TimelineMetric> metricList = new ArrayList<TimelineMetric>(); + for (String attributeName : attributeMap.keySet()) { + String attributeValue = attributeMap.get(attributeName); + if (NumberUtils.isNumber(attributeValue)) { + LOG.info(attributeName + " = " + attributeValue); + TimelineMetric timelineMetric = createTimelineMetric(currentTimeMillis, + component, attributeName, attributeValue); + // Put intermediate values into the cache until it is time to send + metricsCache.putTimelineMetric(timelineMetric); + + TimelineMetric cachedMetric = metricsCache.getTimelineMetric(attributeName); + + if (cachedMetric != null) { + metricList.add(cachedMetric); + } + } + } + + if (!metricList.isEmpty()) { + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(metricList); + emitMetrics(timelineMetrics); + } + } + + private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(attributeName); + timelineMetric.setHostName(hostname); + timelineMetric.setAppId("flume." + component); + timelineMetric.setStartTime(currentTimeMillis); + timelineMetric.setType(ClassUtils.getShortCanonicalName( + attributeValue, "Number")); + timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue)); + return timelineMetric; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java new file mode 100644 index 0000000..0275db6 --- /dev/null +++ b/ambari-metrics/ambari-metrics-flume-sink/src/test/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.flume; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.flume.instrumentation.util.JMXPollUtil; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; + +import static org.powermock.api.easymock.PowerMock.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(JMXPollUtil.class) +public class FlumeTimelineMetricsSinkTest { + @Test + public void testNonNumericMetricMetricExclusion() throws InterruptedException { + FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink(); + TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink); + flumeTimelineMetricsSink.setPollFrequency(1); + HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class); + flumeTimelineMetricsSink.setHttpClient(httpClient); + mockStatic(JMXPollUtil.class); + EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn( + Collections.singletonMap("component1", Collections.singletonMap("key1", "value1"))).once(); + replay(JMXPollUtil.class, timelineMetricsCache, httpClient); + flumeTimelineMetricsSink.start(); + Thread.sleep(5); + flumeTimelineMetricsSink.stop(); + verifyAll(); + } + + @Test + public void testNumericMetricMetricSubmission() throws InterruptedException { + FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink(); + TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink); + flumeTimelineMetricsSink.setPollFrequency(1); + HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class); + flumeTimelineMetricsSink.setHttpClient(httpClient); + mockStatic(JMXPollUtil.class); + EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn( + Collections.singletonMap("component1", Collections.singletonMap("key1", "42"))).once(); + replay(JMXPollUtil.class, timelineMetricsCache, httpClient); + flumeTimelineMetricsSink.start(); + Thread.sleep(5); + flumeTimelineMetricsSink.stop(); + verifyAll(); + } + + private TimelineMetricsCache getTimelineMetricsCache(FlumeTimelineMetricsSink flumeTimelineMetricsSink) { + TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class); + flumeTimelineMetricsSink.setMetricsCache(timelineMetricsCache); + EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1")) + .andReturn(new TimelineMetric()).once(); + timelineMetricsCache.putTimelineMetric(EasyMock.anyObject(TimelineMetric.class)); + EasyMock.expectLastCall().once(); + return timelineMetricsCache; + } + + @Test + public void testMonitorRestart() throws InterruptedException { + FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink(); + TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink); + flumeTimelineMetricsSink.setPollFrequency(1); + HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class); + flumeTimelineMetricsSink.setHttpClient(httpClient); + mockStatic(JMXPollUtil.class); + EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn( + Collections.singletonMap("component1", Collections.singletonMap("key1", "42"))).once(); + flumeTimelineMetricsSink.start(); + flumeTimelineMetricsSink.stop(); + replay(JMXPollUtil.class, timelineMetricsCache, httpClient); + flumeTimelineMetricsSink.start(); + Thread.sleep(5); + flumeTimelineMetricsSink.stop(); + verifyAll(); + } + + @Test + public void testMetricsRetrievalExceptionTolerance() throws InterruptedException { + FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink(); + TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink); + flumeTimelineMetricsSink.setPollFrequency(1); + HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class); + flumeTimelineMetricsSink.setHttpClient(httpClient); + mockStatic(JMXPollUtil.class); + EasyMock.expect(JMXPollUtil.getAllMBeans()). + andThrow(new RuntimeException("Failed to retrieve Flume Properties")).once(); + replay(JMXPollUtil.class, timelineMetricsCache, httpClient); + flumeTimelineMetricsSink.start(); + Thread.sleep(5); + flumeTimelineMetricsSink.stop(); + verifyAll(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml index 0397e2e..1e854e2 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml +++ b/ambari-metrics/ambari-metrics-hadoop-sink/pom.xml @@ -133,6 +133,11 @@ limitations under the License. <dependencies> <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-common</artifactId> + <version>0.1.0-SNAPSHOT</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.4.0</version> @@ -176,12 +181,7 @@ limitations under the License. <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> - <version>1.9.9</version> - </dependency> - <dependency> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - <version>1.9.13</version> + <version>1.8.0</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2 b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2 index 6e64421..0d13498 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2 +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2-hbase.properties.j2 @@ -29,21 +29,21 @@ hbase.extendedperiod = 3600 # Configuration of the "hbase" context for timeline metrics service -hbase.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink +hbase.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink hbase.period=10 hbase.collector={{timeline_server_hosts}}:8188 # Configuration of the "jvm" context for timeline metrics service -jvm.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink +jvm.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink jvm.period=10 jvm.collector={{timeline_server_hosts}}:8188 # Configuration of the "rpc" context for timeline metrics service -rpc.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink +rpc.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink rpc.period=10 rpc.collector={{timeline_server_hosts}}:8188 # Following hadoop example -hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink +hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink hbase.sink.timeline.period=10 hbase.sink.timeline.collector={{timeline_server_hosts}}:8188 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2 b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2 index 7a00a7e..76a00d1 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2 +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/conf/hadoop-metrics2.properties.j2 @@ -37,7 +37,7 @@ {% if has_ganglia_server %} *.period=60 -*.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsSink +*.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink *.sink.timeline.period=10 http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java deleted file mode 100644 index 2c42274..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.sink.timeline; - -import org.apache.commons.configuration.SubsetConfiguration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.MetricsSink; -import org.apache.hadoop.metrics2.util.Servers; -import org.apache.hadoop.net.DNS; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.util.List; - -public abstract class AbstractTimelineMetricsSink implements MetricsSink { - - public final Log LOG = LogFactory.getLog(this.getClass()); - - private SubsetConfiguration conf; - private String hostName = "UNKNOWN.example.com"; - private String serviceName = ""; - private final String COLLECTOR_HOST_PROPERTY = "collector"; - private final int DEFAULT_PORT = 8188; - - private List<? extends SocketAddress> metricsServers; - private String collectorUri; - - @Override - public void init(SubsetConfiguration conf) { - this.conf = conf; - LOG.info("Initializing Timeline metrics sink."); - - // Take the hostname from the DNS class. - if (conf.getString("slave.host.name") != null) { - hostName = conf.getString("slave.host.name"); - } else { - try { - hostName = DNS.getDefaultHost( - conf.getString("dfs.datanode.dns.interface", "default"), - conf.getString("dfs.datanode.dns.nameserver", "default")); - } catch (UnknownHostException uhe) { - LOG.error(uhe); - hostName = "UNKNOWN.example.com"; - } - } - - serviceName = getFirstConfigPrefix(conf); - - // Load collector configs - metricsServers = Servers.parse(conf.getString(COLLECTOR_HOST_PROPERTY), - DEFAULT_PORT); - - if (metricsServers == null || metricsServers.isEmpty()) { - LOG.error("No Metric collector configured."); - } else { - collectorUri = "http://" + conf.getString(COLLECTOR_HOST_PROPERTY).trim() - + "/ws/v1/timeline/metrics"; - } - } - - protected String getHostName() { - return hostName; - } - - protected String getServiceName() { - return serviceName; - } - - private String getFirstConfigPrefix(SubsetConfiguration conf) { - while (conf.getParent() instanceof SubsetConfiguration) { - conf = (SubsetConfiguration) conf.getParent(); - } - return conf.getPrefix(); - } - - protected SocketAddress getServerSocketAddress() { - if (metricsServers != null && !metricsServers.isEmpty()) { - return metricsServers.get(0); - } - return null; - } - - protected String getCollectorUri() { - return collectorUri; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java new file mode 100644 index 0000000..8fcf464 --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.commons.lang.ClassUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.*; +import org.apache.hadoop.metrics2.impl.MsInfo; +import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink; +import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; +import org.apache.hadoop.metrics2.util.Servers; +import org.apache.hadoop.net.DNS; + +import java.io.IOException; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.*; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink { + private Map<String, Set<String>> useTagsMap = new HashMap<String, Set<String>>(); + private TimelineMetricsCache metricsCache; + private String hostName = "UNKNOWN.example.com"; + private String serviceName = ""; + private List<? extends SocketAddress> metricsServers; + private String collectorUri; + + @Override + public void init(SubsetConfiguration conf) { + LOG.info("Initializing Timeline metrics sink."); + + // Take the hostname from the DNS class. + if (conf.getString("slave.host.name") != null) { + hostName = conf.getString("slave.host.name"); + } else { + try { + hostName = DNS.getDefaultHost( + conf.getString("dfs.datanode.dns.interface", "default"), + conf.getString("dfs.datanode.dns.nameserver", "default")); + } catch (UnknownHostException uhe) { + LOG.error(uhe); + hostName = "UNKNOWN.example.com"; + } + } + + serviceName = getFirstConfigPrefix(conf); + + // Load collector configs + metricsServers = Servers.parse(conf.getString(COLLECTOR_HOST_PROPERTY), 8188); + + if (metricsServers == null || metricsServers.isEmpty()) { + LOG.error("No Metric collector configured."); + } else { + collectorUri = "http://" + conf.getString(COLLECTOR_HOST_PROPERTY).trim() + + "/ws/v1/timeline/metrics"; + } + + int maxRowCacheSize = conf.getInt(MAX_METRIC_ROW_CACHE_SIZE, + TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT); + int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL, + TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); // ~ 1 min + metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); + + conf.setListDelimiter(','); + Iterator<String> it = (Iterator<String>) conf.getKeys(); + while (it.hasNext()) { + String propertyName = it.next(); + if (propertyName != null && propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) { + String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length()); + String[] tags = conf.getStringArray(propertyName); + boolean useAllTags = false; + Set<String> set = null; + if (tags.length > 0) { + set = new HashSet<String>(); + for (String tag : tags) { + tag = tag.trim(); + useAllTags |= tag.equals("*"); + if (tag.length() > 0) { + set.add(tag); + } + } + if (useAllTags) { + set = null; + } + } + useTagsMap.put(contextName, set); + } + } + } + + private String getFirstConfigPrefix(SubsetConfiguration conf) { + while (conf.getParent() instanceof SubsetConfiguration) { + conf = (SubsetConfiguration) conf.getParent(); + } + return conf.getPrefix(); + } + + protected SocketAddress getServerSocketAddress() { + if (metricsServers != null && !metricsServers.isEmpty()) { + return metricsServers.get(0); + } + return null; + } + + @Override + protected String getCollectorUri() { + return collectorUri; + } + + @Override + public void putMetrics(MetricsRecord record) { + try { + String recordName = record.name(); + String contextName = record.context(); + + StringBuilder sb = new StringBuilder(); + sb.append(contextName); + sb.append('.'); + sb.append(recordName); + + appendPrefix(record, sb); + sb.append("."); + int sbBaseLen = sb.length(); + + Collection<AbstractMetric> metrics = + (Collection<AbstractMetric>) record.metrics(); + + List<TimelineMetric> metricList = new ArrayList<TimelineMetric>(); + + for (AbstractMetric metric : metrics) { + sb.append(metric.name()); + String name = sb.toString(); + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(name); + timelineMetric.setHostName(hostName); + timelineMetric.setAppId(serviceName); + timelineMetric.setStartTime(record.timestamp()); + timelineMetric.setType(ClassUtils.getShortCanonicalName( + metric.value(), "Number")); + timelineMetric.getMetricValues().put(record.timestamp(), + metric.value().doubleValue()); + // Put intermediate values into the cache until it is time to send + metricsCache.putTimelineMetric(timelineMetric); + + // Retrieve all values from cache if it is time to send + TimelineMetric cachedMetric = metricsCache.getTimelineMetric(name); + + if (cachedMetric != null) { + metricList.add(cachedMetric); + } + + sb.setLength(sbBaseLen); + } + + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(metricList); + + if (!metricList.isEmpty()) { + emitMetrics(timelineMetrics); + } + } catch (IOException io) { + throw new MetricsException("Failed to putMetrics", io); + } + } + + // Taken as is from Ganglia30 implementation + @InterfaceAudience.Private + public void appendPrefix(MetricsRecord record, StringBuilder sb) { + String contextName = record.context(); + Collection<MetricsTag> tags = record.tags(); + if (useTagsMap.containsKey(contextName)) { + Set<String> useTags = useTagsMap.get(contextName); + for (MetricsTag t : tags) { + if (useTags == null || useTags.contains(t.name())) { + + // the context is always skipped here because it is always added + + // the hostname is always skipped to avoid case-mismatches + // from different DNSes. + + if (t.info() != MsInfo.Context && t.info() != MsInfo.Hostname && t.value() != null) { + sb.append('.').append(t.name()).append('=').append(t.value()); + } + } + } + } + } + + @Override + public void flush() { + // TODO: Buffering implementation + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java deleted file mode 100644 index 68b4be8..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.sink.timeline; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlRootElement; -import java.util.Map; -import java.util.TreeMap; - -@XmlRootElement(name = "metric") -@XmlAccessorType(XmlAccessType.NONE) -@InterfaceAudience.Public -@InterfaceStability.Unstable -public class TimelineMetric implements Comparable<TimelineMetric> { - - private String metricName; - private String appId; - private String instanceId; - private String hostName; - private long timestamp; - private long startTime; - private String type; - private Map<Long, Double> metricValues = new TreeMap<Long, Double>(); - - @XmlElement(name = "metricname") - public String getMetricName() { - return metricName; - } - - public void setMetricName(String metricName) { - this.metricName = metricName; - } - - @XmlElement(name = "appid") - public String getAppId() { - return appId; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - @XmlElement(name = "instanceid") - public String getInstanceId() { - return instanceId; - } - - public void setInstanceId(String instanceId) { - this.instanceId = instanceId; - } - - @XmlElement(name = "hostname") - public String getHostName() { - return hostName; - } - - public void setHostName(String hostName) { - this.hostName = hostName; - } - - @XmlElement(name = "timestamp") - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - @XmlElement(name = "starttime") - public long getStartTime() { - return startTime; - } - - public void setStartTime(long startTime) { - this.startTime = startTime; - } - - @XmlElement(name = "type") - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - @XmlElement(name = "metrics") - public Map<Long, Double> getMetricValues() { - return metricValues; - } - - public void setMetricValues(Map<Long, Double> metricValues) { - this.metricValues = metricValues; - } - - public void addMetricValues(Map<Long, Double> metricValues) { - this.metricValues.putAll(metricValues); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TimelineMetric metric = (TimelineMetric) o; - - if (!metricName.equals(metric.metricName)) return false; - if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null) - return false; - if (appId != null ? !appId.equals(metric.appId) : metric.appId != null) - return false; - if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) - return false; - if (timestamp != metric.timestamp) return false; - if (startTime != metric.startTime) return false; - - return true; - } - - public boolean equalsExceptTime(TimelineMetric metric) { - if (!metricName.equals(metric.metricName)) return false; - if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null) - return false; - if (appId != null ? !appId.equals(metric.appId) : metric.appId != null) - return false; - if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) - return false; - - return true; - } - - @Override - public int hashCode() { - int result = metricName.hashCode(); - result = 31 * result + (appId != null ? appId.hashCode() : 0); - result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); - result = 31 * result + (hostName != null ? hostName.hashCode() : 0); - result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); - return result; - } - - @Override - public int compareTo(TimelineMetric other) { - if (timestamp > other.timestamp) { - return -1; - } else if (timestamp < other.timestamp) { - return 1; - } else { - return metricName.compareTo(other.metricName); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java deleted file mode 100644 index a6c925a..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.sink.timeline; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; - -import javax.xml.bind.annotation.XmlAccessType; -import javax.xml.bind.annotation.XmlAccessorType; -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlRootElement; -import java.util.ArrayList; -import java.util.List; - -/** - * The class that hosts a list of timeline entities. - */ -@XmlRootElement(name = "metrics") -@XmlAccessorType(XmlAccessType.NONE) -@InterfaceAudience.Public -@InterfaceStability.Unstable -public class TimelineMetrics { - - private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>(); - - public TimelineMetrics() {} - - @XmlElement(name = "metrics") - public List<TimelineMetric> getMetrics() { - return allMetrics; - } - - public void setMetrics(List<TimelineMetric> allMetrics) { - this.allMetrics = allMetrics; - } - - private boolean isEqualTimelineMetrics(TimelineMetric metric1, - TimelineMetric metric2) { - - boolean isEqual = true; - - if (!metric1.getMetricName().equals(metric2.getMetricName())) { - return false; - } - - if (metric1.getHostName() != null) { - isEqual = metric1.getHostName().equals(metric2.getHostName()); - } - - if (metric1.getAppId() != null) { - isEqual = metric1.getAppId().equals(metric2.getAppId()); - } - - return isEqual; - } - - /** - * Merge with existing TimelineMetric if everything except startTime is - * the same. - * @param metric {@link TimelineMetric} - */ - public void addOrMergeTimelineMetric(TimelineMetric metric) { - TimelineMetric metricToMerge = null; - - if (!allMetrics.isEmpty()) { - for (TimelineMetric timelineMetric : allMetrics) { - if (timelineMetric.equalsExceptTime(metric)) { - metricToMerge = timelineMetric; - break; - } - } - } - - if (metricToMerge != null) { - metricToMerge.addMetricValues(metric.getMetricValues()); - if (metricToMerge.getTimestamp() > metric.getTimestamp()) { - metricToMerge.setTimestamp(metric.getTimestamp()); - } - if (metricToMerge.getStartTime() > metric.getStartTime()) { - metricToMerge.setStartTime(metric.getStartTime()); - } - } else { - allMetrics.add(metric); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java deleted file mode 100644 index 36aaec2..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsCache.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.sink.timeline; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; - -import java.util.LinkedHashMap; -import java.util.Map; - -@InterfaceAudience.Public -@InterfaceStability.Evolving -public class TimelineMetricsCache { - - private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder(); - private static final Log LOG = LogFactory.getLog(TimelineMetric.class); - static final int MAX_RECS_PER_NAME_DEFAULT = 10000; - static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min - private final int maxRecsPerName; - private final int maxEvictionTimeInMillis; - - TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) { - this.maxRecsPerName = maxRecsPerName; - this.maxEvictionTimeInMillis = maxEvictionTimeInMillis; - } - - class TimelineMetricWrapper { - private long timeDiff = -1; - private long oldestTimestamp = -1; - private TimelineMetric timelineMetric; - - TimelineMetricWrapper(TimelineMetric timelineMetric) { - this.timelineMetric = timelineMetric; - this.oldestTimestamp = timelineMetric.getStartTime(); - } - - private void updateTimeDiff(long timestamp) { - if (oldestTimestamp != -1 && timestamp > oldestTimestamp) { - timeDiff = timestamp - oldestTimestamp; - } else { - oldestTimestamp = timestamp; - } - } - - public void putMetric(TimelineMetric metric) { - this.timelineMetric.addMetricValues(metric.getMetricValues()); - updateTimeDiff(metric.getStartTime()); - } - - public long getTimeDiff() { - return timeDiff; - } - - public TimelineMetric getTimelineMetric() { - return timelineMetric; - } - } - - // TODO: Change to ConcurentHashMap with weighted eviction - class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> { - private static final long serialVersionUID = 1L; - private boolean gotOverflow = false; - - @Override - protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) { - boolean overflow = size() > maxRecsPerName; - if (overflow && !gotOverflow) { - LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest); - gotOverflow = true; - } - return overflow; - } - - public TimelineMetric evict(String metricName) { - TimelineMetricWrapper metricWrapper = this.get(metricName); - - if (metricWrapper == null - || metricWrapper.getTimeDiff() < maxEvictionTimeInMillis) { - return null; - } - - TimelineMetric timelineMetric = metricWrapper.getTimelineMetric(); - this.remove(metricName); - - return timelineMetric; - } - - public void put(String metricName, TimelineMetric timelineMetric) { - - TimelineMetricWrapper metric = this.get(metricName); - if (metric == null) { - this.put(metricName, new TimelineMetricWrapper(timelineMetric)); - } else { - metric.putMetric(timelineMetric); - } - } - } - - public TimelineMetric getTimelineMetric(String metricName) { - if (timelineMetricCache.containsKey(metricName)) { - return timelineMetricCache.evict(metricName); - } - - return null; - } - - public void putTimelineMetric(TimelineMetric timelineMetric) { - timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric); - } -}