Repository: ambari Updated Branches: refs/heads/branch-3.0-ams 0e5094fdb -> 7e970233a
AMBARI-21079. Add ability to sink Raw metrics to external system via Http. Renamed files to fix build. (swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7e970233 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7e970233 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7e970233 Branch: refs/heads/branch-3.0-ams Commit: 7e970233a5cf046903baf87d1e4b4d89264db4f3 Parents: 0e5094f Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Tue May 30 16:45:27 2017 -0700 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Tue May 30 16:45:27 2017 -0700 ---------------------------------------------------------------------- .../timeline/HBaseTimelineMetricStore.java | 544 ------------------- .../timeline/HBaseTimelineMetricsService.java | 544 +++++++++++++++++++ .../timeline/HBaseTimelineMetricStoreTest.java | 132 ----- .../HBaseTimelineMetricsServiceTest.java | 132 +++++ 4 files changed, 676 insertions(+), 676 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java deleted file mode 100644 index 9ebc64c..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ /dev/null @@ -1,544 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; -import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; -import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; -import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import org.apache.hadoop.metrics2.sink.timeline.Precision; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.*; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; - -public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore { - - static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class); - private final TimelineMetricConfiguration configuration; - private PhoenixHBaseAccessor hBaseAccessor; - private static volatile boolean isInitialized = false; - private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor(); - private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>(); - private TimelineMetricMetadataManager metricMetadataManager; - private Integer defaultTopNHostsLimit; - private MetricCollectorHAController haController; - private AmsKafkaProducer kafkaProducer = new AmsKafkaProducer("104.196.85.21:6667"); - - /** - * Construct the service. - * - */ - public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) { - super(HBaseTimelineMetricsService.class.getName()); - this.configuration = configuration; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - initializeSubsystem(); - } - - private synchronized void initializeSubsystem() { - if (!isInitialized) { - hBaseAccessor = new PhoenixHBaseAccessor(null); - // Initialize schema - hBaseAccessor.initMetricSchema(); - // Initialize metadata from store - try { - metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor); - } catch (MalformedURLException | URISyntaxException e) { - throw new ExceptionInInitializerError("Unable to initialize metadata manager"); - } - metricMetadataManager.initializeMetadata(); - // Initialize policies before TTL update - hBaseAccessor.initPoliciesAndTTL(); - // Start HA service - // Start the controller - if (!configuration.isDistributedCollectorModeDisabled()) { - haController = new MetricCollectorHAController(configuration); - try { - haController.initializeHAController(); - } catch (Exception e) { - LOG.error(e); - throw new MetricsSystemInitializationException("Unable to " + - "initialize HA controller", e); - } - } else { - LOG.info("Distributed collector mode disabled"); - } - - //Initialize whitelisting & blacklisting if needed - TimelineMetricsFilter.initializeMetricFilter(configuration); - - Configuration metricsConf = null; - try { - metricsConf = configuration.getMetricsConf(); - } catch (Exception e) { - throw new ExceptionInInitializerError("Cannot initialize configuration."); - } - - defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20")); - if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { - LOG.info("Using group by aggregators for aggregating host and cluster metrics."); - } - - // Start the cluster aggregator second - TimelineMetricAggregator secondClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( - hBaseAccessor, metricsConf, metricMetadataManager, haController); - scheduleAggregatorThread(secondClusterAggregator); - - // Start the minute cluster aggregator - TimelineMetricAggregator minuteClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute( - hBaseAccessor, metricsConf, haController); - scheduleAggregatorThread(minuteClusterAggregator); - - // Start the hourly cluster aggregator - TimelineMetricAggregator hourlyClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly( - hBaseAccessor, metricsConf, haController); - scheduleAggregatorThread(hourlyClusterAggregator); - - // Start the daily cluster aggregator - TimelineMetricAggregator dailyClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily( - hBaseAccessor, metricsConf, haController); - scheduleAggregatorThread(dailyClusterAggregator); - - // Start the minute host aggregator - if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) { - LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector"); - } else { - TimelineMetricAggregator minuteHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute( - hBaseAccessor, metricsConf, haController); - scheduleAggregatorThread(minuteHostAggregator); - } - - // Start the hourly host aggregator - TimelineMetricAggregator hourlyHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly( - hBaseAccessor, metricsConf, haController); - scheduleAggregatorThread(hourlyHostAggregator); - - // Start the daily host aggregator - TimelineMetricAggregator dailyHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily( - hBaseAccessor, metricsConf, haController); - scheduleAggregatorThread(dailyHostAggregator); - - if (!configuration.isTimelineMetricsServiceWatcherDisabled()) { - int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay(); - int delay = configuration.getTimelineMetricsServiceWatcherDelay(); - // Start the watchdog - watchdogExecutorService.scheduleWithFixedDelay( - new TimelineMetricStoreWatcher(this, configuration), - initDelay, delay, TimeUnit.SECONDS); - LOG.info("Started watchdog for timeline metrics store with initial " + - "delay = " + initDelay + ", delay = " + delay); - } - - isInitialized = true; - } - - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - } - - @Override - public TimelineMetrics getTimelineMetrics(List<String> metricNames, - List<String> hostnames, String applicationId, String instanceId, - Long startTime, Long endTime, Precision precision, Integer limit, - boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException { - - if (metricNames == null || metricNames.isEmpty()) { - throw new IllegalArgumentException("No metric name filter specified."); - } - if ((startTime == null && endTime != null) - || (startTime != null && endTime == null)) { - throw new IllegalArgumentException("Open ended query not supported "); - } - if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT){ - throw new IllegalArgumentException("Limit too big"); - } - - TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null; - if (!StringUtils.isEmpty(seriesAggregateFunction)) { - SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction); - seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func); - } - - Multimap<String, List<Function>> metricFunctions = - parseMetricNamesToAggregationFunctions(metricNames); - - ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet())) - .hostnames(hostnames) - .appId(applicationId) - .instanceId(instanceId) - .startTime(startTime) - .endTime(endTime) - .precision(precision) - .limit(limit) - .grouped(groupedByHosts); - - if (topNConfig != null) { - if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true. - TopNCondition.isTopNMetricCondition(metricNames, hostnames)) { - conditionBuilder.topN(topNConfig.getTopN()); - conditionBuilder.isBottomN(topNConfig.getIsBottomN()); - Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction()); - Function function = new Function(readFunction, null); - conditionBuilder.topNFunction(function); - } else { - LOG.info("Invalid Input for TopN query. Ignoring TopN Request."); - } - } else if (startTime != null && hostnames != null && hostnames.size() > defaultTopNHostsLimit) { - // if (timeseries query AND hostnames passed AND size(hostnames) > limit) - LOG.info("Requesting data for more than " + defaultTopNHostsLimit + " Hosts. " + - "Defaulting to Top " + defaultTopNHostsLimit); - conditionBuilder.topN(defaultTopNHostsLimit); - conditionBuilder.isBottomN(false); - } - - Condition condition = conditionBuilder.build(); - - TimelineMetrics metrics; - - if (hostnames == null || hostnames.isEmpty()) { - metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions); - } else { - metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions); - } - - metrics = postProcessMetrics(metrics); - - if (metrics.getMetrics().size() == 0) { - return metrics; - } - - return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics); - } - - private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) { - List<TimelineMetric> metricsList = metrics.getMetrics(); - - for (TimelineMetric metric : metricsList){ - String name = metric.getMetricName(); - if (name.contains("._rate")){ - updateValuesAsRate(metric.getMetricValues(), false); - } else if (name.contains("._diff")) { - updateValuesAsRate(metric.getMetricValues(), true); - } - } - - return metrics; - } - - private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance, - TimelineMetrics metrics) { - if (seriesAggrFuncInstance != null) { - TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics); - metrics.setMetrics(Collections.singletonList(appliedMetric)); - } - return metrics; - } - - static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) { - Long prevTime = null; - Double prevVal = null; - long step; - Double diff; - - for(Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator(); it.hasNext(); ) { - Map.Entry<Long, Double> timeValueEntry = it.next(); - Long currTime = timeValueEntry.getKey(); - Double currVal = timeValueEntry.getValue(); - - if (prevTime != null) { - step = currTime - prevTime; - diff = currVal - prevVal; - Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step)); - timeValueEntry.setValue(rate); - } else { - it.remove(); - } - - prevTime = currTime; - prevVal = currVal; - } - - return metricValues; - } - - static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) { - Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create(); - - for (String metricName : metricNames){ - Function function = Function.DEFAULT_VALUE_FUNCTION; - String cleanMetricName = metricName; - - try { - function = Function.fromMetricName(metricName); - int functionStartIndex = metricName.indexOf("._"); - if (functionStartIndex > 0) { - cleanMetricName = metricName.substring(0, functionStartIndex); - } - } catch (Function.FunctionFormatException ffe){ - // unknown function so - // fallback to VALUE, and fullMetricName - } - - List<Function> functionsList = new ArrayList<>(); - functionsList.add(function); - metricsFunctions.put(cleanMetricName, functionsList); - } - - return metricsFunctions; - } - - @Override - public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException { - // Error indicated by the Sql exception - TimelinePutResponse response = new TimelinePutResponse(); - - try { - if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) { - kafkaProducer.sendMetrics(fromTimelineMetrics(metrics)); - } - } catch (InterruptedException | ExecutionException e) { - LOG.error(e); - } - hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false); - - return response; - } - - - private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) { - org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics(); - - List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>(); - for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { - timelineMetricList.add(fromTimelineMetric(timelineMetric)); - } - otherMetrics.setMetrics(timelineMetricList); - return otherMetrics; - } - - private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) { - - org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric(); - otherMetric.setMetricValues(timelineMetric.getMetricValues()); - otherMetric.setStartTime(timelineMetric.getStartTime()); - otherMetric.setHostName(timelineMetric.getHostName()); - otherMetric.setInstanceId(timelineMetric.getInstanceId()); - otherMetric.setAppId(timelineMetric.getAppId()); - otherMetric.setMetricName(timelineMetric.getMetricName()); - - return otherMetric; - } - - @Override - public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) - throws SQLException, IOException { - hBaseAccessor.insertContainerMetrics(metrics); - return new TimelinePutResponse(); - } - - @Override - public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException { - Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = - metricMetadataManager.getMetadataCache(); - - boolean includeBlacklistedMetrics = StringUtils.isNotEmpty(query) && "all".equalsIgnoreCase(query); - - // Group Metadata by AppId - Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>(); - for (TimelineMetricMetadata metricMetadata : metadata.values()) { - - if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) { - continue; - } - List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId()); - if (metadataList == null) { - metadataList = new ArrayList<>(); - metadataByAppId.put(metricMetadata.getAppId(), metadataList); - } - - metadataList.add(metricMetadata); - } - - return metadataByAppId; - } - - @Override - public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException { - return metricMetadataManager.getHostedAppsCache(); - } - - @Override - public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException { - Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); - for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) { - aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate()); - } - hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME); - - - return new TimelinePutResponse(); - } - - @Override - public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) - throws SQLException, IOException { - - Map<String, Set<String>> hostedApps = metricMetadataManager.getHostedAppsCache(); - Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache(); - Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>(); - - if (MapUtils.isEmpty(instanceHosts)) { - Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>(); - for (String host : hostedApps.keySet()) { - for (String app : hostedApps.get(host)) { - if (!appHostMap.containsKey(app)) { - appHostMap.put(app, new HashSet<String>()); - } - appHostMap.get(app).add(host); - } - } - instanceAppHosts.put("", appHostMap); - } else { - for (String instance : instanceHosts.keySet()) { - - if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) { - continue; - } - Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>(); - instanceAppHosts.put(instance, appHostMap); - - Set<String> hostsWithInstance = instanceHosts.get(instance); - for (String host : hostsWithInstance) { - for (String app : hostedApps.get(host)) { - if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) { - continue; - } - - if (!appHostMap.containsKey(app)) { - appHostMap.put(app, new HashSet<String>()); - } - appHostMap.get(app).add(host); - } - } - } - } - - return instanceAppHosts; - } - - @Override - public List<String> getLiveInstances() { - - List<String> instances = null; - try { - if (haController == null) { - // Always return current host as live (embedded operation mode) - return Collections.singletonList(configuration.getInstanceHostnameFromEnv()); - } - instances = haController.getLiveInstanceHostNames(); - if (instances == null || instances.isEmpty()) { - // fallback - instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv()); - } - } catch (UnknownHostException e) { - LOG.debug("Exception on getting hostname from env.", e); - } - return instances; - } - - private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) { - if (!aggregator.isDisabled()) { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName())); - } - } - ); - scheduledExecutors.put(aggregator.getName(), executorService); - executorService.scheduleAtFixedRate(aggregator, - 0l, - aggregator.getSleepIntervalMillis(), - TimeUnit.MILLISECONDS); - LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " + - + aggregator.getSleepIntervalMillis() + " milliseconds."); - } else { - LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled."); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java new file mode 100644 index 0000000..9ebc64c --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java @@ -0,0 +1,544 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; + +public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore { + + static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class); + private final TimelineMetricConfiguration configuration; + private PhoenixHBaseAccessor hBaseAccessor; + private static volatile boolean isInitialized = false; + private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>(); + private TimelineMetricMetadataManager metricMetadataManager; + private Integer defaultTopNHostsLimit; + private MetricCollectorHAController haController; + private AmsKafkaProducer kafkaProducer = new AmsKafkaProducer("104.196.85.21:6667"); + + /** + * Construct the service. + * + */ + public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) { + super(HBaseTimelineMetricsService.class.getName()); + this.configuration = configuration; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + initializeSubsystem(); + } + + private synchronized void initializeSubsystem() { + if (!isInitialized) { + hBaseAccessor = new PhoenixHBaseAccessor(null); + // Initialize schema + hBaseAccessor.initMetricSchema(); + // Initialize metadata from store + try { + metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor); + } catch (MalformedURLException | URISyntaxException e) { + throw new ExceptionInInitializerError("Unable to initialize metadata manager"); + } + metricMetadataManager.initializeMetadata(); + // Initialize policies before TTL update + hBaseAccessor.initPoliciesAndTTL(); + // Start HA service + // Start the controller + if (!configuration.isDistributedCollectorModeDisabled()) { + haController = new MetricCollectorHAController(configuration); + try { + haController.initializeHAController(); + } catch (Exception e) { + LOG.error(e); + throw new MetricsSystemInitializationException("Unable to " + + "initialize HA controller", e); + } + } else { + LOG.info("Distributed collector mode disabled"); + } + + //Initialize whitelisting & blacklisting if needed + TimelineMetricsFilter.initializeMetricFilter(configuration); + + Configuration metricsConf = null; + try { + metricsConf = configuration.getMetricsConf(); + } catch (Exception e) { + throw new ExceptionInInitializerError("Cannot initialize configuration."); + } + + defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20")); + if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { + LOG.info("Using group by aggregators for aggregating host and cluster metrics."); + } + + // Start the cluster aggregator second + TimelineMetricAggregator secondClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( + hBaseAccessor, metricsConf, metricMetadataManager, haController); + scheduleAggregatorThread(secondClusterAggregator); + + // Start the minute cluster aggregator + TimelineMetricAggregator minuteClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(minuteClusterAggregator); + + // Start the hourly cluster aggregator + TimelineMetricAggregator hourlyClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(hourlyClusterAggregator); + + // Start the daily cluster aggregator + TimelineMetricAggregator dailyClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(dailyClusterAggregator); + + // Start the minute host aggregator + if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) { + LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector"); + } else { + TimelineMetricAggregator minuteHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(minuteHostAggregator); + } + + // Start the hourly host aggregator + TimelineMetricAggregator hourlyHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(hourlyHostAggregator); + + // Start the daily host aggregator + TimelineMetricAggregator dailyHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(dailyHostAggregator); + + if (!configuration.isTimelineMetricsServiceWatcherDisabled()) { + int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay(); + int delay = configuration.getTimelineMetricsServiceWatcherDelay(); + // Start the watchdog + watchdogExecutorService.scheduleWithFixedDelay( + new TimelineMetricStoreWatcher(this, configuration), + initDelay, delay, TimeUnit.SECONDS); + LOG.info("Started watchdog for timeline metrics store with initial " + + "delay = " + initDelay + ", delay = " + delay); + } + + isInitialized = true; + } + + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + @Override + public TimelineMetrics getTimelineMetrics(List<String> metricNames, + List<String> hostnames, String applicationId, String instanceId, + Long startTime, Long endTime, Precision precision, Integer limit, + boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException { + + if (metricNames == null || metricNames.isEmpty()) { + throw new IllegalArgumentException("No metric name filter specified."); + } + if ((startTime == null && endTime != null) + || (startTime != null && endTime == null)) { + throw new IllegalArgumentException("Open ended query not supported "); + } + if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT){ + throw new IllegalArgumentException("Limit too big"); + } + + TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null; + if (!StringUtils.isEmpty(seriesAggregateFunction)) { + SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction); + seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func); + } + + Multimap<String, List<Function>> metricFunctions = + parseMetricNamesToAggregationFunctions(metricNames); + + ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet())) + .hostnames(hostnames) + .appId(applicationId) + .instanceId(instanceId) + .startTime(startTime) + .endTime(endTime) + .precision(precision) + .limit(limit) + .grouped(groupedByHosts); + + if (topNConfig != null) { + if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true. + TopNCondition.isTopNMetricCondition(metricNames, hostnames)) { + conditionBuilder.topN(topNConfig.getTopN()); + conditionBuilder.isBottomN(topNConfig.getIsBottomN()); + Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction()); + Function function = new Function(readFunction, null); + conditionBuilder.topNFunction(function); + } else { + LOG.info("Invalid Input for TopN query. Ignoring TopN Request."); + } + } else if (startTime != null && hostnames != null && hostnames.size() > defaultTopNHostsLimit) { + // if (timeseries query AND hostnames passed AND size(hostnames) > limit) + LOG.info("Requesting data for more than " + defaultTopNHostsLimit + " Hosts. " + + "Defaulting to Top " + defaultTopNHostsLimit); + conditionBuilder.topN(defaultTopNHostsLimit); + conditionBuilder.isBottomN(false); + } + + Condition condition = conditionBuilder.build(); + + TimelineMetrics metrics; + + if (hostnames == null || hostnames.isEmpty()) { + metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions); + } else { + metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions); + } + + metrics = postProcessMetrics(metrics); + + if (metrics.getMetrics().size() == 0) { + return metrics; + } + + return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics); + } + + private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) { + List<TimelineMetric> metricsList = metrics.getMetrics(); + + for (TimelineMetric metric : metricsList){ + String name = metric.getMetricName(); + if (name.contains("._rate")){ + updateValuesAsRate(metric.getMetricValues(), false); + } else if (name.contains("._diff")) { + updateValuesAsRate(metric.getMetricValues(), true); + } + } + + return metrics; + } + + private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance, + TimelineMetrics metrics) { + if (seriesAggrFuncInstance != null) { + TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics); + metrics.setMetrics(Collections.singletonList(appliedMetric)); + } + return metrics; + } + + static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) { + Long prevTime = null; + Double prevVal = null; + long step; + Double diff; + + for(Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<Long, Double> timeValueEntry = it.next(); + Long currTime = timeValueEntry.getKey(); + Double currVal = timeValueEntry.getValue(); + + if (prevTime != null) { + step = currTime - prevTime; + diff = currVal - prevVal; + Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step)); + timeValueEntry.setValue(rate); + } else { + it.remove(); + } + + prevTime = currTime; + prevVal = currVal; + } + + return metricValues; + } + + static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) { + Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create(); + + for (String metricName : metricNames){ + Function function = Function.DEFAULT_VALUE_FUNCTION; + String cleanMetricName = metricName; + + try { + function = Function.fromMetricName(metricName); + int functionStartIndex = metricName.indexOf("._"); + if (functionStartIndex > 0) { + cleanMetricName = metricName.substring(0, functionStartIndex); + } + } catch (Function.FunctionFormatException ffe){ + // unknown function so + // fallback to VALUE, and fullMetricName + } + + List<Function> functionsList = new ArrayList<>(); + functionsList.add(function); + metricsFunctions.put(cleanMetricName, functionsList); + } + + return metricsFunctions; + } + + @Override + public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException { + // Error indicated by the Sql exception + TimelinePutResponse response = new TimelinePutResponse(); + + try { + if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) { + kafkaProducer.sendMetrics(fromTimelineMetrics(metrics)); + } + } catch (InterruptedException | ExecutionException e) { + LOG.error(e); + } + hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false); + + return response; + } + + + private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) { + org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics(); + + List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>(); + for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { + timelineMetricList.add(fromTimelineMetric(timelineMetric)); + } + otherMetrics.setMetrics(timelineMetricList); + return otherMetrics; + } + + private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) { + + org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric(); + otherMetric.setMetricValues(timelineMetric.getMetricValues()); + otherMetric.setStartTime(timelineMetric.getStartTime()); + otherMetric.setHostName(timelineMetric.getHostName()); + otherMetric.setInstanceId(timelineMetric.getInstanceId()); + otherMetric.setAppId(timelineMetric.getAppId()); + otherMetric.setMetricName(timelineMetric.getMetricName()); + + return otherMetric; + } + + @Override + public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) + throws SQLException, IOException { + hBaseAccessor.insertContainerMetrics(metrics); + return new TimelinePutResponse(); + } + + @Override + public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException { + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = + metricMetadataManager.getMetadataCache(); + + boolean includeBlacklistedMetrics = StringUtils.isNotEmpty(query) && "all".equalsIgnoreCase(query); + + // Group Metadata by AppId + Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>(); + for (TimelineMetricMetadata metricMetadata : metadata.values()) { + + if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) { + continue; + } + List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId()); + if (metadataList == null) { + metadataList = new ArrayList<>(); + metadataByAppId.put(metricMetadata.getAppId(), metadataList); + } + + metadataList.add(metricMetadata); + } + + return metadataByAppId; + } + + @Override + public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException { + return metricMetadataManager.getHostedAppsCache(); + } + + @Override + public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException { + Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); + for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) { + aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate()); + } + hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME); + + + return new TimelinePutResponse(); + } + + @Override + public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) + throws SQLException, IOException { + + Map<String, Set<String>> hostedApps = metricMetadataManager.getHostedAppsCache(); + Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache(); + Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>(); + + if (MapUtils.isEmpty(instanceHosts)) { + Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>(); + for (String host : hostedApps.keySet()) { + for (String app : hostedApps.get(host)) { + if (!appHostMap.containsKey(app)) { + appHostMap.put(app, new HashSet<String>()); + } + appHostMap.get(app).add(host); + } + } + instanceAppHosts.put("", appHostMap); + } else { + for (String instance : instanceHosts.keySet()) { + + if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) { + continue; + } + Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>(); + instanceAppHosts.put(instance, appHostMap); + + Set<String> hostsWithInstance = instanceHosts.get(instance); + for (String host : hostsWithInstance) { + for (String app : hostedApps.get(host)) { + if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) { + continue; + } + + if (!appHostMap.containsKey(app)) { + appHostMap.put(app, new HashSet<String>()); + } + appHostMap.get(app).add(host); + } + } + } + } + + return instanceAppHosts; + } + + @Override + public List<String> getLiveInstances() { + + List<String> instances = null; + try { + if (haController == null) { + // Always return current host as live (embedded operation mode) + return Collections.singletonList(configuration.getInstanceHostnameFromEnv()); + } + instances = haController.getLiveInstanceHostNames(); + if (instances == null || instances.isEmpty()) { + // fallback + instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv()); + } + } catch (UnknownHostException e) { + LOG.debug("Exception on getting hostname from env.", e); + } + return instances; + } + + private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) { + if (!aggregator.isDisabled()) { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName())); + } + } + ); + scheduledExecutors.put(aggregator.getName(), executorService); + executorService.scheduleAtFixedRate(aggregator, + 0l, + aggregator.getSleepIntervalMillis(), + TimeUnit.MILLISECONDS); + LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " + + + aggregator.getSleepIntervalMillis() + " milliseconds."); + } else { + LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled."); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java deleted file mode 100644 index f035678..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import com.google.common.collect.Multimap; -import junit.framework.Assert; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; -import org.junit.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.SUM; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE; -import static org.assertj.core.api.Assertions.assertThat; - -public class HBaseTimelineMetricsServiceTest { - - public static final String MEM_METRIC = "mem"; - public static final String BYTES_IN_METRIC = "bytes_in"; - public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction"; - - @Test - public void testParseMetricNamesToAggregationFunctions() throws Exception { - //giwen - List<String> metricNames = Arrays.asList( - MEM_METRIC + "._avg", - MEM_METRIC + "._sum", - MEM_METRIC + "._rate._avg", - BYTES_IN_METRIC, - BYTES_NOT_AFUNCTION_METRIC); - - //when - Multimap<String, List<Function>> multimap = - HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames); - - //then - Assert.assertEquals(multimap.keySet().size(), 3); - Assert.assertTrue(multimap.containsKey(MEM_METRIC)); - Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC)); - Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC)); - - List<List<Function>> metricEntry = (List<List<Function>>) multimap.get(MEM_METRIC); - HashMap<String, List<Function>> mfm = new HashMap<String, List<Function>>(); - mfm.put(MEM_METRIC, metricEntry.get(0)); - - assertThat(mfm.get(MEM_METRIC)).containsOnly( - new Function(AVG, null)); - - mfm = new HashMap<String, List<Function>>(); - mfm.put(MEM_METRIC, metricEntry.get(1)); - assertThat(mfm.get(MEM_METRIC)).containsOnly( - new Function(SUM, null)); - - mfm = new HashMap<String, List<Function>>(); - mfm.put(MEM_METRIC, metricEntry.get(2)); - assertThat(mfm.get(MEM_METRIC)).containsOnly( - new Function(AVG, RATE)); - - metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC); - mfm = new HashMap<String, List<Function>>(); - mfm.put(BYTES_IN_METRIC, metricEntry.get(0)); - - assertThat(mfm.get(BYTES_IN_METRIC)) - .contains(Function.DEFAULT_VALUE_FUNCTION); - - metricEntry = (List<List<Function>>) multimap.get(BYTES_NOT_AFUNCTION_METRIC); - mfm = new HashMap<String, List<Function>>(); - mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0)); - - assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC)) - .contains(Function.DEFAULT_VALUE_FUNCTION); - - } - - @Test - public void testRateCalculationOnMetricsWithEqualValues() throws Exception { - Map<Long, Double> metricValues = new TreeMap<>(); - metricValues.put(1454016368371L, 1011.25); - metricValues.put(1454016428371L, 1011.25); - metricValues.put(1454016488371L, 1011.25); - metricValues.put(1454016548371L, 1011.25); - metricValues.put(1454016608371L, 1011.25); - metricValues.put(1454016668371L, 1011.25); - metricValues.put(1454016728371L, 1011.25); - - // Calculate rate - Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false); - - // Make sure rate is zero - for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) { - Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey() - + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue()); - } - } - - @Test - public void testDiffCalculation() throws Exception { - Map<Long, Double> metricValues = new TreeMap<>(); - metricValues.put(1454016368371L, 1011.25); - metricValues.put(1454016428371L, 1010.25); - metricValues.put(1454016488371L, 1012.25); - metricValues.put(1454016548371L, 1010.25); - metricValues.put(1454016608371L, 1010.25); - - Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true); - - Assert.assertTrue(rates.size()==4); - Assert.assertTrue(rates.containsValue(-1.0)); - Assert.assertTrue(rates.containsValue(2.0)); - Assert.assertTrue(rates.containsValue(0.0)); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java new file mode 100644 index 0000000..f035678 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import com.google.common.collect.Multimap; +import junit.framework.Assert; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.SUM; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE; +import static org.assertj.core.api.Assertions.assertThat; + +public class HBaseTimelineMetricsServiceTest { + + public static final String MEM_METRIC = "mem"; + public static final String BYTES_IN_METRIC = "bytes_in"; + public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction"; + + @Test + public void testParseMetricNamesToAggregationFunctions() throws Exception { + //giwen + List<String> metricNames = Arrays.asList( + MEM_METRIC + "._avg", + MEM_METRIC + "._sum", + MEM_METRIC + "._rate._avg", + BYTES_IN_METRIC, + BYTES_NOT_AFUNCTION_METRIC); + + //when + Multimap<String, List<Function>> multimap = + HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames); + + //then + Assert.assertEquals(multimap.keySet().size(), 3); + Assert.assertTrue(multimap.containsKey(MEM_METRIC)); + Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC)); + Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC)); + + List<List<Function>> metricEntry = (List<List<Function>>) multimap.get(MEM_METRIC); + HashMap<String, List<Function>> mfm = new HashMap<String, List<Function>>(); + mfm.put(MEM_METRIC, metricEntry.get(0)); + + assertThat(mfm.get(MEM_METRIC)).containsOnly( + new Function(AVG, null)); + + mfm = new HashMap<String, List<Function>>(); + mfm.put(MEM_METRIC, metricEntry.get(1)); + assertThat(mfm.get(MEM_METRIC)).containsOnly( + new Function(SUM, null)); + + mfm = new HashMap<String, List<Function>>(); + mfm.put(MEM_METRIC, metricEntry.get(2)); + assertThat(mfm.get(MEM_METRIC)).containsOnly( + new Function(AVG, RATE)); + + metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC); + mfm = new HashMap<String, List<Function>>(); + mfm.put(BYTES_IN_METRIC, metricEntry.get(0)); + + assertThat(mfm.get(BYTES_IN_METRIC)) + .contains(Function.DEFAULT_VALUE_FUNCTION); + + metricEntry = (List<List<Function>>) multimap.get(BYTES_NOT_AFUNCTION_METRIC); + mfm = new HashMap<String, List<Function>>(); + mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0)); + + assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC)) + .contains(Function.DEFAULT_VALUE_FUNCTION); + + } + + @Test + public void testRateCalculationOnMetricsWithEqualValues() throws Exception { + Map<Long, Double> metricValues = new TreeMap<>(); + metricValues.put(1454016368371L, 1011.25); + metricValues.put(1454016428371L, 1011.25); + metricValues.put(1454016488371L, 1011.25); + metricValues.put(1454016548371L, 1011.25); + metricValues.put(1454016608371L, 1011.25); + metricValues.put(1454016668371L, 1011.25); + metricValues.put(1454016728371L, 1011.25); + + // Calculate rate + Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false); + + // Make sure rate is zero + for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) { + Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey() + + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue()); + } + } + + @Test + public void testDiffCalculation() throws Exception { + Map<Long, Double> metricValues = new TreeMap<>(); + metricValues.put(1454016368371L, 1011.25); + metricValues.put(1454016428371L, 1010.25); + metricValues.put(1454016488371L, 1012.25); + metricValues.put(1454016548371L, 1010.25); + metricValues.put(1454016608371L, 1010.25); + + Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true); + + Assert.assertTrue(rates.size()==4); + Assert.assertTrue(rates.containsValue(-1.0)); + Assert.assertTrue(rates.containsValue(2.0)); + Assert.assertTrue(rates.containsValue(0.0)); + } +}