Repository: ambari Updated Branches: refs/heads/trunk 9cdd4e244 -> b42150cad
http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsSink.java deleted file mode 100644 index a843428..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricsSink.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.sink.timeline; - -import org.apache.commons.configuration.SubsetConfiguration; -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.methods.PostMethod; -import org.apache.commons.httpclient.methods.StringRequestEntity; -import org.apache.commons.lang.ClassUtils; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.metrics2.AbstractMetric; -import org.apache.hadoop.metrics2.MetricsException; -import org.apache.hadoop.metrics2.MetricsRecord; -import org.apache.hadoop.metrics2.MetricsTag; -import org.apache.hadoop.metrics2.impl.MsInfo; -import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricsCache; -import org.codehaus.jackson.map.AnnotationIntrospector; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.annotate.JsonSerialize; -import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; -import java.io.IOException; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -@InterfaceAudience.Public -@InterfaceStability.Evolving -public class TimelineMetricsSink extends AbstractTimelineMetricsSink { - private static ObjectMapper mapper; - private Map<String, Set<String>> useTagsMap = new HashMap<String, Set<String>>(); - private static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix."; - private static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize"; - private static final String METRICS_SEND_INTERVAL = "sendInterval"; - protected HttpClient httpClient = new HttpClient(); - private TimelineMetricsCache metricsCache; - - static { - mapper = new ObjectMapper(); - AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); - mapper.setAnnotationIntrospector(introspector); - mapper.getSerializationConfig() - .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); - } - - @Override - public void init(SubsetConfiguration conf) { - super.init(conf); - - int maxRowCacheSize = conf.getInt(MAX_METRIC_ROW_CACHE_SIZE, - TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT); - int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL, - TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); // ~ 1 min - metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); - - conf.setListDelimiter(','); - Iterator<String> it = (Iterator<String>) conf.getKeys(); - while (it.hasNext()) { - String propertyName = it.next(); - if (propertyName != null && propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) { - String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length()); - String[] tags = conf.getStringArray(propertyName); - boolean useAllTags = false; - Set<String> set = null; - if (tags.length > 0) { - set = new HashSet<String>(); - for (String tag : tags) { - tag = tag.trim(); - useAllTags |= tag.equals("*"); - if (tag.length() > 0) { - set.add(tag); - } - } - if (useAllTags) { - set = null; - } - } - useTagsMap.put(contextName, set); - } - } - } - - @Override - public void putMetrics(MetricsRecord record) { - try { - String recordName = record.name(); - String contextName = record.context(); - - StringBuilder sb = new StringBuilder(); - sb.append(contextName); - sb.append('.'); - sb.append(recordName); - - appendPrefix(record, sb); - sb.append("."); - int sbBaseLen = sb.length(); - - Collection<AbstractMetric> metrics = - (Collection<AbstractMetric>) record.metrics(); - - List<TimelineMetric> metricList = new ArrayList<TimelineMetric>(); - - for (AbstractMetric metric : metrics) { - sb.append(metric.name()); - String name = sb.toString(); - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(name); - timelineMetric.setHostName(getHostName()); - timelineMetric.setAppId(getServiceName()); - timelineMetric.setStartTime(record.timestamp()); - timelineMetric.setType(ClassUtils.getShortCanonicalName( - metric.value(), "Number")); - timelineMetric.getMetricValues().put(record.timestamp(), - metric.value().doubleValue()); - // Put intermediate values into the cache until it is time to send - metricsCache.putTimelineMetric(timelineMetric); - - // Retrieve all values from cache if it is time to send - TimelineMetric cachedMetric = metricsCache.getTimelineMetric(name); - - if (cachedMetric != null) { - metricList.add(cachedMetric); - } - - sb.setLength(sbBaseLen); - } - - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.setMetrics(metricList); - - if (!metricList.isEmpty()) { - emitMetrics(timelineMetrics); - } - - - } catch (IOException io) { - throw new MetricsException("Failed to putMetrics", io); - } - } - - private void emitMetrics(TimelineMetrics metrics) throws IOException { - String jsonData = mapper.writeValueAsString(metrics); - - SocketAddress socketAddress = getServerSocketAddress(); - - if (socketAddress != null) { - StringRequestEntity requestEntity = new StringRequestEntity( - jsonData, "application/json", "UTF-8"); - - PostMethod postMethod = new PostMethod(getCollectorUri()); - postMethod.setRequestEntity(requestEntity); - int statusCode = httpClient.executeMethod(postMethod); - if (statusCode != 200) { - LOG.info("Unable to POST metrics to collector, " + getCollectorUri()); - } - } - } - - // Taken as is from Ganglia30 implementation - @InterfaceAudience.Private - public void appendPrefix(MetricsRecord record, StringBuilder sb) { - String contextName = record.context(); - Collection<MetricsTag> tags = record.tags(); - if (useTagsMap.containsKey(contextName)) { - Set<String> useTags = useTagsMap.get(contextName); - for (MetricsTag t : tags) { - if (useTags == null || useTags.contains(t.name())) { - - // the context is always skipped here because it is always added - - // the hostname is always skipped to avoid case-mismatches - // from different DNSes. - - if (t.info() != MsInfo.Context && t.info() != MsInfo.Hostname && t.value() != null) { - sb.append('.').append(t.name()).append('=').append(t.value()); - } - } - } - } - } - - @Override - public void flush() { - // TODO: Buffering implementation - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/ambari-metrics-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml index d0a72ae..dca1772 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml +++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml @@ -271,8 +271,8 @@ </dependency> <dependency> - <artifactId>ambari-metrics-hadoop-sink</artifactId> <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-common</artifactId> <version>0.1.0-SNAPSHOT</version> </dependency> http://git-wip-us.apache.org/repos/asf/ambari/blob/b42150ca/ambari-metrics/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml index 8549c26..b090205 100644 --- a/ambari-metrics/pom.xml +++ b/ambari-metrics/pom.xml @@ -26,7 +26,9 @@ <version>0.1.0-SNAPSHOT</version> <packaging>pom</packaging> <modules> + <module>ambari-metrics-common</module> <module>ambari-metrics-hadoop-sink</module> + <module>ambari-metrics-flume-sink</module> <module>ambari-metrics-timelineservice</module> <module>ambari-metrics-host-monitoring</module> </modules>