AMBARI-21079. Add ability to sink Raw metrics to external system via Http. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c32eebf8 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c32eebf8 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c32eebf8 Branch: refs/heads/branch-3.0-ams Commit: c32eebf8935acab9d1a510bf05e8f8aeb8873a6f Parents: cd769e2 Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Tue May 23 14:01:14 2017 -0700 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Tue May 23 14:01:14 2017 -0700 ---------------------------------------------------------------------- ambari-metrics/ambari-metrics-common/pom.xml | 29 ++- .../TimelineMetricsEhCacheSizeOfEngine.java | 115 +++++++++ .../ApplicationHistoryServer.java | 13 +- .../timeline/HBaseTimelineMetricStore.java | 30 ++- .../metrics/timeline/PhoenixHBaseAccessor.java | 230 ++++++++++-------- .../timeline/TimelineMetricConfiguration.java | 176 +++++++++----- .../TimelineMetricClusterAggregator.java | 2 +- .../TimelineMetricMetadataManager.java | 15 +- .../timeline/sink/DefaultFSSinkProvider.java | 153 ++++++++++++ .../timeline/sink/ExternalMetricsSink.java | 48 ++++ .../timeline/sink/ExternalSinkProvider.java | 35 +++ .../metrics/timeline/sink/HttpSinkProvider.java | 231 +++++++++++++++++++ .../DefaultInternalMetricsSourceProvider.java | 42 ++++ .../timeline/source/InternalMetricsSource.java | 30 +++ .../timeline/source/InternalSourceProvider.java | 39 ++++ .../timeline/source/RawMetricsSource.java | 93 ++++++++ .../source/cache/InternalMetricCacheKey.java | 109 +++++++++ .../source/cache/InternalMetricCacheValue.java | 37 +++ .../source/cache/InternalMetricsCache.java | 231 +++++++++++++++++++ .../cache/InternalMetricsCacheProvider.java | 48 ++++ .../cache/InternalMetricsCacheSizeOfEngine.java | 66 ++++++ .../TestApplicationHistoryServer.java | 4 +- .../timeline/AbstractMiniHBaseClusterTest.java | 49 ++-- .../timeline/HBaseTimelineMetricStoreTest.java | 8 +- .../timeline/ITPhoenixHBaseAccessor.java | 110 ++++----- .../timeline/PhoenixHBaseAccessorTest.java | 167 ++++++-------- .../TimelineMetricStoreWatcherTest.java | 4 +- .../aggregators/ITClusterAggregator.java | 72 +++--- .../timeline/discovery/TestMetadataManager.java | 2 +- .../timeline/discovery/TestMetadataSync.java | 6 +- .../timeline/source/RawMetricsSourceTest.java | 142 ++++++++++++ .../cache/TimelineMetricsCacheSizeOfEngine.java | 71 +----- pom.xml | 1 + 33 files changed, 1933 insertions(+), 475 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-common/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml index 62ae75f..dc2ab5e 100644 --- a/ambari-metrics/ambari-metrics-common/pom.xml +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@ -70,43 +70,47 @@ <relocations> <relocation> <pattern>com.google</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.google</shadedPattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.google</shadedPattern> </relocation> <relocation> <pattern>org.apache.commons.io</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.io</shadedPattern>StormTimelineMetricsReporter + <shadedPattern>org.apache.ambari.metrics.sink.relocated.commons.io</shadedPattern>StormTimelineMetricsReporter </relocation> <relocation> <pattern>org.apache.commons.lang</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.lang</shadedPattern> + <shadedPattern>org.apache.ambari.metrics.relocated.commons.lang</shadedPattern> </relocation> <relocation> <pattern>org.apache.curator</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.curator</shadedPattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.curator</shadedPattern> </relocation> <relocation> <pattern>org.apache.jute</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jute</shadedPattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.jute</shadedPattern> </relocation> <relocation> <pattern>org.apache.zookeeper</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.zookeeper</shadedPattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.zookeeper</shadedPattern> </relocation> <relocation> <pattern>org.slf4j</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.slf4j</shadedPattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.slf4j</shadedPattern> </relocation> <relocation> <pattern>org.apache.log4j</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.log4j</shadedPattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.log4j</shadedPattern> </relocation> <relocation> <pattern>jline</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jline</shadedPattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.jline</shadedPattern> </relocation> <relocation> <pattern>org.jboss</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jboss</shadedPattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.jboss</shadedPattern> + </relocation> + <relocation> + <pattern>net.sf.ehcache</pattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.ehcache</shadedPattern> </relocation> </relocations> </configuration> @@ -118,6 +122,11 @@ <dependencies> <dependency> + <groupId>net.sf.ehcache</groupId> + <artifactId>ehcache</artifactId> + <version>2.10.0</version> + </dependency> + <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java new file mode 100644 index 0000000..ea694b7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.cache; + +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import net.sf.ehcache.pool.SizeOfEngine; +import net.sf.ehcache.pool.impl.DefaultSizeOfEngine; +import net.sf.ehcache.pool.sizeof.ReflectionSizeOf; +import net.sf.ehcache.pool.sizeof.SizeOf; + +/** + * Cache sizing engine that reduces reflective calls over the Object graph to + * find total Heap usage. Used for ehcache based on available memory. + */ +public abstract class TimelineMetricsEhCacheSizeOfEngine implements SizeOfEngine { + private final static Logger LOG = LoggerFactory.getLogger(TimelineMetricsEhCacheSizeOfEngine.class); + + private static int DEFAULT_MAX_DEPTH = 1000; + private static boolean DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED = false; + + // Base Engine + protected SizeOfEngine underlying = null; + + // Counter + protected SizeOf reflectionSizeOf = new ReflectionSizeOf(); + + // Optimizations + private volatile long timelineMetricPrimitivesApproximation = 0; + + // Map entry sizing + private long sizeOfMapEntry; + private long sizeOfMapEntryOverhead; + + protected TimelineMetricsEhCacheSizeOfEngine(SizeOfEngine underlying) { + this.underlying = underlying; + } + + public TimelineMetricsEhCacheSizeOfEngine() { + this(new DefaultSizeOfEngine(DEFAULT_MAX_DEPTH, DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED)); + + this.sizeOfMapEntry = reflectionSizeOf.sizeOf(new Long(1)) + + reflectionSizeOf.sizeOf(new Double(2.0)); + + //SizeOfMapEntryOverhead = SizeOfMapWithOneEntry - (SizeOfEmptyMap + SizeOfOneEntry) + TreeMap<Long, Double> map = new TreeMap<>(); + long emptyMapSize = reflectionSizeOf.sizeOf(map); + map.put(new Long(1), new Double(2.0)); + long sizeOfMapOneEntry = reflectionSizeOf.deepSizeOf(DEFAULT_MAX_DEPTH, DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED, map).getCalculated(); + this.sizeOfMapEntryOverhead = sizeOfMapOneEntry - (emptyMapSize + this.sizeOfMapEntry); + + LOG.info("Creating custom sizeof engine for TimelineMetrics."); + } + + /** + * Return size of the metrics TreeMap in an optimized way. + * + */ + protected long getTimelineMetricsSize(TimelineMetrics metrics) { + long size = 8; // Object reference + + if (metrics != null) { + for (TimelineMetric metric : metrics.getMetrics()) { + + if (timelineMetricPrimitivesApproximation == 0) { + timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getMetricName()); + timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getAppId()); + timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getHostName()); + timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getInstanceId()); + timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getTimestamp()); + timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getStartTime()); + timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getType()); + timelineMetricPrimitivesApproximation += 8; // Object overhead + + LOG.debug("timelineMetricPrimitivesApproximation bytes = " + timelineMetricPrimitivesApproximation); + } + size += timelineMetricPrimitivesApproximation; + size += getValueMapSize(metric.getMetricValues()); + } + LOG.debug("Total Size of metric values in cache: " + size); + } + return size; + } + + protected long getValueMapSize(Map<Long, Double> metricValues) { + long size = 0; + if (metricValues != null && !metricValues.isEmpty()) { + // Numeric wrapper: 12 bytes + 8 bytes Data type + 4 bytes alignment = 48 (Long, Double) + // Tree Map: 12 bytes for header + 20 bytes for 5 object fields : pointers + 1 byte for flag = 40 + LOG.debug("Size of metric value: " + (sizeOfMapEntry + sizeOfMapEntryOverhead) * metricValues.size()); + size += (sizeOfMapEntry + sizeOfMapEntryOverhead) * metricValues.size(); // Treemap size is O(1) + } + return size; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java index 1ca9c33..331670d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -34,7 +34,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricsService; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.MemoryTimelineStore; @@ -71,7 +71,7 @@ public class ApplicationHistoryServer extends CompositeService { @Override protected void serviceInit(Configuration conf) throws Exception { - metricConfiguration = new TimelineMetricConfiguration(); + metricConfiguration = TimelineMetricConfiguration.getInstance(); metricConfiguration.initialize(); historyManager = createApplicationHistory(); ahsClientService = createApplicationHistoryClientService(historyManager); @@ -164,11 +164,16 @@ public class ApplicationHistoryServer extends CompositeService { protected TimelineMetricStore createTimelineMetricStore(Configuration conf) { LOG.info("Creating metrics store."); - return new HBaseTimelineMetricStore(metricConfiguration); + return new HBaseTimelineMetricsService(metricConfiguration); } protected void startWebApp() { - String bindAddress = metricConfiguration.getWebappAddress(); + String bindAddress = null; + try { + bindAddress = metricConfiguration.getWebappAddress(); + } catch (Exception e) { + throw new ExceptionInInitializerError("Cannot find bind address"); + } LOG.info("Instantiating AHSWebApp at " + bindAddress); try { Configuration conf = metricConfiguration.getMetricsConf(); http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index f984253..c8eb65f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; import java.net.UnknownHostException; import java.sql.SQLException; import java.util.ArrayList; @@ -71,9 +73,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; -public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore { +public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore { - static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class); + static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class); private final TimelineMetricConfiguration configuration; private PhoenixHBaseAccessor hBaseAccessor; private static volatile boolean isInitialized = false; @@ -87,25 +89,28 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin * Construct the service. * */ - public HBaseTimelineMetricStore(TimelineMetricConfiguration configuration) { - super(HBaseTimelineMetricStore.class.getName()); + public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) { + super(HBaseTimelineMetricsService.class.getName()); this.configuration = configuration; } @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); - initializeSubsystem(configuration.getHbaseConf(), configuration.getMetricsConf()); + initializeSubsystem(); } - private synchronized void initializeSubsystem(Configuration hbaseConf, - Configuration metricsConf) { + private synchronized void initializeSubsystem() { if (!isInitialized) { - hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf); + hBaseAccessor = new PhoenixHBaseAccessor(null); // Initialize schema hBaseAccessor.initMetricSchema(); // Initialize metadata from store - metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor, metricsConf); + try { + metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor); + } catch (MalformedURLException | URISyntaxException e) { + throw new ExceptionInInitializerError("Unable to initialize metadata manager"); + } metricMetadataManager.initializeMetadata(); // Initialize policies before TTL update hBaseAccessor.initPoliciesAndTTL(); @@ -127,6 +132,13 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin //Initialize whitelisting & blacklisting if needed TimelineMetricsFilter.initializeMetricFilter(configuration); + Configuration metricsConf = null; + try { + metricsConf = configuration.getMetricsConf(); + } catch (Exception e) { + throw new ExceptionInInitializerError("Cannot initialize configuration."); + } + defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20")); if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { LOG.info("Using group by aggregators for aggregating host and cluster metrics."); http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 3b2a119..15b0bb8 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -17,88 +17,18 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.util.RetryCounter; -import org.apache.hadoop.hbase.util.RetryCounterFactory; -import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; -import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; -import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import org.apache.hadoop.metrics2.sink.timeline.Precision; -import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -import org.apache.phoenix.exception.PhoenixIOException; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.type.TypeReference; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL; @@ -107,11 +37,19 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL; @@ -120,7 +58,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; @@ -139,11 +76,80 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalSinkProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalMetricsSource; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.phoenix.exception.PhoenixIOException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; /** @@ -198,16 +204,29 @@ public class PhoenixHBaseAccessor { private HashMap<String, String> tableTTL = new HashMap<>(); - public PhoenixHBaseAccessor(Configuration hbaseConf, - Configuration metricsConf){ - this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf)); + private final TimelineMetricConfiguration configuration; + private InternalMetricsSource rawMetricsSource; + + public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) { + this(TimelineMetricConfiguration.getInstance(), dataSource); } - PhoenixHBaseAccessor(Configuration hbaseConf, - Configuration metricsConf, + // Test friendly construction since mock instrumentation is difficult to get + // working with hadoop mini cluster + PhoenixHBaseAccessor(TimelineMetricConfiguration configuration, PhoenixConnectionProvider dataSource) { - this.hbaseConf = hbaseConf; - this.metricsConf = metricsConf; + this.configuration = TimelineMetricConfiguration.getInstance(); + try { + this.hbaseConf = configuration.getHbaseConf(); + this.metricsConf = configuration.getMetricsConf(); + } catch (Exception e) { + throw new ExceptionInInitializerError("Cannot initialize configuration."); + } + if (dataSource == null) { + dataSource = new DefaultPhoenixDataSource(hbaseConf); + } + this.dataSource = dataSource; + RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, RESULTSET_LIMIT); try { Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); @@ -215,7 +234,7 @@ public class PhoenixHBaseAccessor { LOG.error("Phoenix client jar not found in the classpath.", e); throw new IllegalStateException(e); } - this.dataSource = dataSource; + this.retryCounterFactory = new RetryCounterFactory(metricsConf.getInt(GLOBAL_MAX_RETRIES, 10), (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 3))); this.outOfBandTimeAllowance = metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, @@ -249,10 +268,20 @@ public class PhoenixHBaseAccessor { metricsConf.getClass(TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, null, TimelineMetricsAggregatorSink.class); if (metricSinkClass != null) { - aggregatorSink = - ReflectionUtils.newInstance(metricSinkClass, metricsConf); + aggregatorSink = ReflectionUtils.newInstance(metricSinkClass, metricsConf); LOG.info("Initialized aggregator sink class " + metricSinkClass); } + + ExternalSinkProvider externalSinkProvider = configuration.getExternalSinkProvider(); + InternalSourceProvider internalSourceProvider = configuration.getInternalSourceProvider(); + if (externalSinkProvider != null) { + ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS); + int interval = configuration.getExternalSinkInterval(RAW_METRICS); + if (interval == -1){ + interval = cacheCommitInterval; + } + rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink); + } } public boolean isInsertCacheEmpty() { @@ -261,12 +290,15 @@ public class PhoenixHBaseAccessor { public void commitMetricsFromCache() { LOG.debug("Clearing metrics cache"); - List<TimelineMetrics> metricsArray = new ArrayList<TimelineMetrics>(insertCache.size()); - while (!insertCache.isEmpty()) { - metricsArray.add(insertCache.poll()); + List<TimelineMetrics> metricsList = new ArrayList<TimelineMetrics>(insertCache.size()); + if (!insertCache.isEmpty()) { + insertCache.drainTo(metricsList); // More performant than poll } - if (metricsArray.size() > 0) { - commitMetrics(metricsArray); + if (metricsList.size() > 0) { + commitMetrics(metricsList); + if (rawMetricsSource != null) { + rawMetricsSource.publishTimelineMetrics(metricsList); + } } } @@ -367,7 +399,7 @@ public class PhoenixHBaseAccessor { } @SuppressWarnings("unchecked") - public static TreeMap<Long, Double> readMetricFromJSON(String json) throws IOException { + public static TreeMap<Long, Double> readMetricFromJSON(String json) throws IOException { return mapper.readValue(json, metricValuesTypeRef); } @@ -701,6 +733,9 @@ public class PhoenixHBaseAccessor { return ""; } + /** + * Insert precision YARN container data. + */ public void insertContainerMetrics(List<ContainerMetric> metrics) throws SQLException, IOException { Connection conn = getConnection(); @@ -766,6 +801,9 @@ public class PhoenixHBaseAccessor { } } + /** + * Insert precision data. + */ public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager, TimelineMetrics metrics, boolean skipCache) throws SQLException, IOException { List<TimelineMetric> timelineMetrics = metrics.getMetrics(); @@ -1385,9 +1423,7 @@ public class PhoenixHBaseAccessor { try { aggregatorSink.saveClusterAggregateRecords(records); } catch (Exception e) { - LOG.warn( - "Error writing cluster aggregate records metrics to external sink. " - + e); + LOG.warn("Error writing cluster aggregate records metrics to external sink. ", e); } } } @@ -1398,8 +1434,8 @@ public class PhoenixHBaseAccessor { * * @throws SQLException */ - public void saveClusterTimeAggregateRecords(Map<TimelineClusterMetric, MetricHostAggregate> records, - String tableName) throws SQLException { + public void saveClusterAggregateRecordsSecond(Map<TimelineClusterMetric, MetricHostAggregate> records, + String tableName) throws SQLException { if (records == null || records.isEmpty()) { LOG.debug("Empty aggregate records."); return; http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index 023465b..de33bd1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -17,15 +17,8 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; - import java.io.BufferedReader; -import java.io.IOException; +import java.io.File; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetAddress; @@ -37,6 +30,21 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalSinkProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.DefaultInternalMetricsSourceProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME; +import org.apache.log4j.Appender; +import org.apache.log4j.FileAppender; +import org.apache.log4j.Logger; + /** * Configuration class that reads properties from ams-site.xml. All values * for time or intervals are given in seconds. @@ -56,6 +64,12 @@ public class TimelineMetricConfiguration { public static final String TIMELINE_METRIC_AGGREGATOR_SINK_CLASS = "timeline.metrics.service.aggregator.sink.class"; + public static final String TIMELINE_METRICS_SOURCE_PROVIDER_CLASS = + "timeline.metrics.service.source.provider.class"; + + public static final String TIMELINE_METRICS_SINK_PROVIDER_CLASS = + "timeline.metrics.service.sink.provider.class"; + public static final String TIMELINE_METRICS_CACHE_SIZE = "timeline.metrics.cache.size"; @@ -297,38 +311,63 @@ public class TimelineMetricConfiguration { public static final String AMSHBASE_METRICS_WHITESLIST_FILE = "amshbase_metrics_whitelist"; public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = "timeline.metrics.host.inmemory.aggregation"; + public static final String INTERNAL_CACHE_HEAP_PERCENT = + "timeline.metrics.service.cache.%s.heap.percent"; + + public static final String EXTERNAL_SINK_INTERVAL = + "timeline.metrics.service.external.sink.%s.interval"; + + public static final String DEFAULT_EXTERNAL_SINK_DIR = + "timeline.metrics.service.external.sink.dir"; private Configuration hbaseConf; private Configuration metricsConf; private Configuration amsEnvConf; private volatile boolean isInitialized = false; + private static TimelineMetricConfiguration instance = new TimelineMetricConfiguration(); + + private TimelineMetricConfiguration() {} + + public static TimelineMetricConfiguration getInstance() { + return instance; + } + + // Tests + public TimelineMetricConfiguration(Configuration hbaseConf, Configuration metricsConf) { + this.hbaseConf = hbaseConf; + this.metricsConf = metricsConf; + this.isInitialized = true; + } + public void initialize() throws URISyntaxException, MalformedURLException { - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - if (classLoader == null) { - classLoader = getClass().getClassLoader(); - } - URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE); - URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE); - LOG.info("Found hbase site configuration: " + hbaseResUrl); - LOG.info("Found metric service configuration: " + amsResUrl); - - if (hbaseResUrl == null) { - throw new IllegalStateException("Unable to initialize the metrics " + - "subsystem. No hbase-site present in the classpath."); - } + if (!isInitialized) { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = getClass().getClassLoader(); + } + URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE); + URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE); + LOG.info("Found hbase site configuration: " + hbaseResUrl); + LOG.info("Found metric service configuration: " + amsResUrl); + + if (hbaseResUrl == null) { + throw new IllegalStateException("Unable to initialize the metrics " + + "subsystem. No hbase-site present in the classpath."); + } - if (amsResUrl == null) { - throw new IllegalStateException("Unable to initialize the metrics " + - "subsystem. No ams-site present in the classpath."); - } + if (amsResUrl == null) { + throw new IllegalStateException("Unable to initialize the metrics " + + "subsystem. No ams-site present in the classpath."); + } - hbaseConf = new Configuration(true); - hbaseConf.addResource(hbaseResUrl.toURI().toURL()); - metricsConf = new Configuration(true); - metricsConf.addResource(amsResUrl.toURI().toURL()); + hbaseConf = new Configuration(true); + hbaseConf.addResource(hbaseResUrl.toURI().toURL()); + metricsConf = new Configuration(true); + metricsConf.addResource(amsResUrl.toURI().toURL()); - isInitialized = true; + isInitialized = true; + } } public Configuration getHbaseConf() throws URISyntaxException, MalformedURLException { @@ -346,31 +385,19 @@ public class TimelineMetricConfiguration { } public String getZKClientPort() throws MalformedURLException, URISyntaxException { - if (!isInitialized) { - initialize(); - } - return hbaseConf.getTrimmed("hbase.zookeeper.property.clientPort", "2181"); + return getHbaseConf().getTrimmed("hbase.zookeeper.property.clientPort", "2181"); } public String getZKQuorum() throws MalformedURLException, URISyntaxException { - if (!isInitialized) { - initialize(); - } - return hbaseConf.getTrimmed("hbase.zookeeper.quorum"); + return getHbaseConf().getTrimmed("hbase.zookeeper.quorum"); } public String getClusterZKClientPort() throws MalformedURLException, URISyntaxException { - if (!isInitialized) { - initialize(); - } - return metricsConf.getTrimmed("cluster.zookeeper.property.clientPort", "2181"); + return getMetricsConf().getTrimmed("cluster.zookeeper.property.clientPort", "2181"); } public String getClusterZKQuorum() throws MalformedURLException, URISyntaxException { - if (!isInitialized) { - initialize(); - } - return metricsConf.getTrimmed("cluster.zookeeper.quorum"); + return getMetricsConf().getTrimmed("cluster.zookeeper.quorum"); } public String getInstanceHostnameFromEnv() throws UnknownHostException { @@ -390,12 +417,9 @@ public class TimelineMetricConfiguration { return DEFAULT_INSTANCE_PORT; } - public String getWebappAddress() { + public String getWebappAddress() throws MalformedURLException, URISyntaxException { String defaultHttpAddress = "0.0.0.0:6188"; - if (metricsConf != null) { - return metricsConf.get(WEBAPP_HTTP_ADDRESS, defaultHttpAddress); - } - return defaultHttpAddress; + return getMetricsConf().get(WEBAPP_HTTP_ADDRESS, defaultHttpAddress); } public int getTimelineMetricsServiceHandlerThreadCount() { @@ -450,8 +474,8 @@ public class TimelineMetricConfiguration { public boolean isDistributedCollectorModeDisabled() { try { - if (metricsConf != null) { - return Boolean.parseBoolean(metricsConf.get("timeline.metrics.service.distributed.collector.mode.disabled", "false")); + if (getMetricsConf() != null) { + return Boolean.parseBoolean(getMetricsConf().get("timeline.metrics.service.distributed.collector.mode.disabled", "false")); } return false; } catch (Exception e) { @@ -497,4 +521,50 @@ public class TimelineMetricConfiguration { return whitelist; } + + public int getExternalSinkInterval(SOURCE_NAME sourceName) { + return Integer.parseInt(metricsConf.get(String.format(EXTERNAL_SINK_INTERVAL, sourceName), "-1")); + } + + public InternalSourceProvider getInternalSourceProvider() { + Class<? extends InternalSourceProvider> providerClass = + metricsConf.getClass(TIMELINE_METRICS_SOURCE_PROVIDER_CLASS, + DefaultInternalMetricsSourceProvider.class, InternalSourceProvider.class); + return ReflectionUtils.newInstance(providerClass, metricsConf); + } + + public ExternalSinkProvider getExternalSinkProvider() { + Class<?> providerClass = metricsConf.getClassByNameOrNull(TIMELINE_METRICS_SINK_PROVIDER_CLASS); + if (providerClass != null) { + return (ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf); + } + return null; + } + + public String getInternalCacheHeapPercent(String instanceName) { + String heapPercent = metricsConf.get(String.format(INTERNAL_CACHE_HEAP_PERCENT, instanceName)); + if (StringUtils.isEmpty(heapPercent)) { + return "5%"; + } else { + return heapPercent.endsWith("%") ? heapPercent : heapPercent + "%"; + } + } + + public String getDefaultMetricsSinkDir() { + String dirPath = metricsConf.get(DEFAULT_EXTERNAL_SINK_DIR); + if (dirPath == null) { + // Only one logger at the time of writing + Appender appender = (Appender) Logger.getRootLogger().getAllAppenders().nextElement(); + if (appender instanceof FileAppender) { + File f = new File(((FileAppender) appender).getFile()); + if (f.exists()) { + dirPath = f.getParent(); + } else { + dirPath = "/tmp"; + } + } + } + + return dirPath; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java index ba16b43..74d4013 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -83,7 +83,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime); LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - hBaseAccessor.saveClusterTimeAggregateRecords(hostAggregateMap, outputTableName); + hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, outputTableName); } private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime) http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java index f904ebe..8a71756 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java @@ -27,8 +27,11 @@ import org.apache.hadoop.metrics2.sink.timeline.MetadataException; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; +import java.net.MalformedURLException; +import java.net.URISyntaxException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -69,17 +72,21 @@ public class TimelineMetricMetadataManager { // Filter metrics names matching given patterns, from metadata final List<String> metricNameFilters = new ArrayList<>(); - public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - this.hBaseAccessor = hBaseAccessor; + // Test friendly construction since mock instrumentation is difficult to get + // working with hadoop mini cluster + public TimelineMetricMetadataManager(Configuration metricsConf, PhoenixHBaseAccessor hBaseAccessor) { this.metricsConf = metricsConf; - + this.hBaseAccessor = hBaseAccessor; String patternStrings = metricsConf.get(TIMELINE_METRIC_METADATA_FILTERS); if (!StringUtils.isEmpty(patternStrings)) { metricNameFilters.addAll(Arrays.asList(patternStrings.split(","))); } } + public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor) throws MalformedURLException, URISyntaxException { + this(TimelineMetricConfiguration.getInstance().getMetricsConf(), hBaseAccessor); + } + /** * Initialize Metadata from the store */ http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java new file mode 100644 index 0000000..6ec6cf9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Date; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider; + +public class DefaultFSSinkProvider implements ExternalSinkProvider { + private static final Log LOG = LogFactory.getLog(DefaultFSSinkProvider.class); + TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance(); + private final DefaultExternalMetricsSink sink = new DefaultExternalMetricsSink(); + private long FIXED_FILE_SIZE; + private final String SINK_FILE_NAME = "external-metrics-sink.dat"; + private final String SEPARATOR = ", "; + private final String LINE_SEP = System.lineSeparator(); + private final String HEADERS = "METRIC, APP_ID, INSTANCE_ID, HOSTNAME, START_TIME, DATA"; + + public DefaultFSSinkProvider() { + try { + FIXED_FILE_SIZE = conf.getMetricsConf().getLong("timeline.metrics.service.external.fs.sink.filesize", FileUtils.ONE_MB * 100); + } catch (Exception ignored) { + FIXED_FILE_SIZE = FileUtils.ONE_MB * 100; + } + } + + @Override + public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) { + return sink; + } + + class DefaultExternalMetricsSink implements ExternalMetricsSink { + + @Override + public int getSinkTimeOutSeconds() { + return 10; + } + + @Override + public int getFlushSeconds() { + try { + return conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); + } catch (Exception e) { + LOG.warn("Cannot read cache commit interval."); + } + return 3; + } + + private boolean createFile(File f) { + boolean created = false; + if (!f.exists()) { + try { + created = f.createNewFile(); + FileUtils.writeStringToFile(f, HEADERS); + } catch (IOException e) { + LOG.error("Cannot create " + SINK_FILE_NAME + " at " + f.getPath()); + return false; + } + } + + return created; + } + + private boolean shouldReCreate(File f) { + if (!f.exists()) { + return true; + } + if (FileUtils.sizeOf(f) > FIXED_FILE_SIZE) { + return true; + } + return false; + } + + @Override + public void sinkMetricData(Collection<TimelineMetrics> metrics) { + String dirPath = TimelineMetricConfiguration.getInstance().getDefaultMetricsSinkDir(); + File dir = new File(dirPath); + if (!dir.exists()) { + LOG.error("Cannot sink data to file system, incorrect dir path " + dirPath); + return; + } + + File f = FileUtils.getFile(dirPath, SINK_FILE_NAME); + if (shouldReCreate(f)) { + if (!f.delete()) { + LOG.warn("Unable to delete external sink file."); + return; + } + createFile(f); + } + + if (metrics != null) { + for (TimelineMetrics timelineMetrics : metrics) { + for (TimelineMetric metric : timelineMetrics.getMetrics()) { + StringBuilder sb = new StringBuilder(); + sb.append(metric.getMetricName()); + sb.append(SEPARATOR); + sb.append(metric.getAppId()); + sb.append(SEPARATOR); + if (StringUtils.isEmpty(metric.getInstanceId())) { + sb.append(SEPARATOR); + } else { + sb.append(metric.getInstanceId()); + sb.append(SEPARATOR); + } + if (StringUtils.isEmpty(metric.getHostName())) { + sb.append(SEPARATOR); + } else { + sb.append(metric.getHostName()); + sb.append(SEPARATOR); + } + sb.append(new Date(metric.getStartTime())); + sb.append(SEPARATOR); + sb.append(metric.getMetricValues().toString()); + sb.append(LINE_SEP); + try { + FileUtils.writeStringToFile(f, sb.toString()); + } catch (IOException e) { + LOG.warn("Unable to sink data to file " + f.getPath()); + } + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java new file mode 100644 index 0000000..ff06307 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java @@ -0,0 +1,48 @@ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink; + +import java.util.Collection; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface ExternalMetricsSink { + /** + * How many seconds to wait on sink before dropping metrics. + * Note: Care should be taken that this timeout does not bottleneck the + * sink thread. + */ + int getSinkTimeOutSeconds(); + + /** + * How frequently to flush data to external system. + * Default would be between 60 - 120 seconds, coherent with default sink + * interval of AMS. + */ + int getFlushSeconds(); + + /** + * Raw data stream to process / store on external system. + * The data will be held in an in-memory cache and flushed at flush seconds + * or when the cache size limit is exceeded we will flush the cache and + * drop data if write fails. + * + * @param metrics {@link Collection<TimelineMetrics>} + */ + void sinkMetricData(Collection<TimelineMetrics> metrics); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java new file mode 100644 index 0000000..48887d9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java @@ -0,0 +1,35 @@ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink; + +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Configurable provider for sink classes that match the metrics sources. + * Provider can return same sink of different sinks for each source. + */ +public interface ExternalSinkProvider { + + /** + * Return an instance of the metrics sink for the give source + * @return {@link ExternalMetricsSink} + */ + ExternalMetricsSink getExternalMetricsSink(SOURCE_NAME sourceName); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java new file mode 100644 index 0000000..bb84c8a --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.security.KeyStore; +import java.util.Collection; + +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider; +import org.apache.http.client.utils.URIBuilder; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +public class HttpSinkProvider implements ExternalSinkProvider { + private static final Log LOG = LogFactory.getLog(HttpSinkProvider.class); + TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance(); + + private String connectUrl; + private SSLSocketFactory sslSocketFactory; + protected static ObjectMapper mapper; + + static { + mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); + mapper.setAnnotationIntrospector(introspector); + mapper.getSerializationConfig() + .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + } + + public HttpSinkProvider() { + Configuration config; + try { + config = conf.getMetricsConf(); + } catch (Exception e) { + throw new ExceptionInInitializerError("Unable to read configuration for sink."); + } + String protocol = config.get("timeline.metrics.service.external.http.sink.protocol", "http"); + String host = config.get("timeline.metrics.service.external.http.sink.host", "localhost"); + String port = config.get("timeline.metrics.service.external.http.sink.port", "6189"); + + if (protocol.contains("https")) { + loadTruststore( + config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.path"), + config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.type"), + config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.password") + ); + } + + URIBuilder uriBuilder = new URIBuilder(); + uriBuilder.setScheme(protocol); + uriBuilder.setHost(host); + uriBuilder.setPort(Integer.parseInt(port)); + connectUrl = uriBuilder.toString(); + } + + @Override + public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) { + return null; + } + + protected HttpURLConnection getConnection(String spec) throws IOException { + return (HttpURLConnection) new URL(spec).openConnection(); + } + + // Get an ssl connection + protected HttpsURLConnection getSSLConnection(String spec) + throws IOException, IllegalStateException { + + HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec).openConnection()); + connection.setSSLSocketFactory(sslSocketFactory); + return connection; + } + + protected void loadTruststore(String trustStorePath, String trustStoreType, + String trustStorePassword) { + if (sslSocketFactory == null) { + if (trustStorePath == null || trustStorePassword == null) { + String msg = "Can't load TrustStore. Truststore path or password is not set."; + LOG.error(msg); + throw new IllegalStateException(msg); + } + FileInputStream in = null; + try { + in = new FileInputStream(new File(trustStorePath)); + KeyStore store = KeyStore.getInstance(trustStoreType == null ? + KeyStore.getDefaultType() : trustStoreType); + store.load(in, trustStorePassword.toCharArray()); + TrustManagerFactory tmf = TrustManagerFactory + .getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(store); + SSLContext context = SSLContext.getInstance("TLS"); + context.init(null, tmf.getTrustManagers(), null); + sslSocketFactory = context.getSocketFactory(); + } catch (Exception e) { + LOG.error("Unable to load TrustStore", e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + LOG.error("Unable to load TrustStore", e); + } + } + } + } + } + + class DefaultHttpMetricsSink implements ExternalMetricsSink { + + @Override + public int getSinkTimeOutSeconds() { + try { + return conf.getMetricsConf().getInt("timeline.metrics.service.external.http.sink.timeout.seconds", 10); + } catch (Exception e) { + return 10; + } + } + + @Override + public int getFlushSeconds() { + try { + return conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); + } catch (Exception e) { + LOG.warn("Cannot read cache commit interval."); + } + return 3; + } + + /** + * Cleans up and closes an input stream + * see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html + * @param is the InputStream to clean up + * @return string read from the InputStream + * @throws IOException + */ + protected String cleanupInputStream(InputStream is) throws IOException { + StringBuilder sb = new StringBuilder(); + if (is != null) { + try ( + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr) + ) { + // read the response body + String line; + while ((line = br.readLine()) != null) { + if (LOG.isDebugEnabled()) { + sb.append(line); + } + } + } finally { + is.close(); + } + } + return sb.toString(); + } + + @Override + public void sinkMetricData(Collection<TimelineMetrics> metrics) { + HttpURLConnection connection = null; + try { + connection = connectUrl.startsWith("https") ? getSSLConnection(connectUrl) : getConnection(connectUrl); + + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestProperty("Connection", "Keep-Alive"); + connection.setConnectTimeout(getSinkTimeOutSeconds()); + connection.setReadTimeout(getSinkTimeOutSeconds()); + connection.setDoOutput(true); + + if (metrics != null) { + String jsonData = mapper.writeValueAsString(metrics); + try (OutputStream os = connection.getOutputStream()) { + os.write(jsonData.getBytes("UTF-8")); + } + } + + int statusCode = connection.getResponseCode(); + + if (statusCode != 200) { + LOG.info("Unable to POST metrics to external sink, " + connectUrl + + ", statusCode = " + statusCode); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Metrics posted to external sink " + connectUrl); + } + } + cleanupInputStream(connection.getInputStream()); + + } catch (IOException io) { + LOG.warn("Unable to sink data to external system.", io); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java new file mode 100644 index 0000000..b97c39f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink; + +public class DefaultInternalMetricsSourceProvider implements InternalSourceProvider { + private static final Log LOG = LogFactory.getLog(DefaultInternalMetricsSourceProvider.class); + + // TODO: Implement read based sources for higher level data + @Override + public InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink) { + if (sink == null) { + LOG.warn("No external sink configured for source " + sourceName); + return null; + } + + switch (sourceName) { + case RAW_METRICS: + return new RawMetricsSource(sinkIntervalSeconds, sink); + default: + throw new UnsupportedOperationException("Unimplemented source type " + sourceName); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java new file mode 100644 index 0000000..a6e1092 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source; + +import java.util.Collection; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface InternalMetricsSource { + /** + * Write metrics to external sink. + * Allows pre-processing and caching capabilities to the consumer. + */ + void publishTimelineMetrics(Collection<TimelineMetrics> metrics); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java new file mode 100644 index 0000000..9d8ca36 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java @@ -0,0 +1,39 @@ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source; + +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface InternalSourceProvider { + + enum SOURCE_NAME { + RAW_METRICS, + MINUTE_HOST_AGGREAGATE_METRICS, + HOURLY_HOST_AGGREAGATE_METRICS, + DAILY_HOST_AGGREAGATE_METRICS, + MINUTE_CLUSTER_AGGREAGATE_METRICS, + HOURLY_CLUSTER_AGGREAGATE_METRICS, + DAILY_CLUSTER_AGGREAGATE_METRICS, + } + + /** + * Provide Source for metrics data. + * @return {@link InternalMetricsSource} + */ + InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink); +}