AMBARI-14882. AMS aggregates Counter values as average over the timeseries (and other issues). (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/58b91c84 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/58b91c84 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/58b91c84 Branch: refs/heads/branch-dev-patch-upgrade Commit: 58b91c845e6a260602373499c22a384f4ca7cbdb Parents: 7632b7b Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Tue Feb 2 16:38:03 2016 -0800 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Tue Feb 2 16:38:03 2016 -0800 ---------------------------------------------------------------------- .../timeline/AbstractTimelineMetricsSink.java | 4 +- .../metrics2/sink/timeline/TimelineMetric.java | 13 ++- .../sink/timeline/TimelineMetricMetadata.java | 15 +-- .../timeline/cache/TimelineMetricsCache.java | 9 +- .../sink/flume/FlumeTimelineMetricsSink.java | 7 +- .../timeline/HadoopTimelineMetricsSink.java | 6 +- .../timeline/HadoopTimelineMetricsSinkTest.java | 5 +- .../kafka/KafkaTimelineMetricsReporter.java | 16 +-- .../storm/StormTimelineMetricsReporter.java | 2 - .../timeline/HBaseTimelineMetricStore.java | 19 ++-- .../metrics/timeline/PhoenixHBaseAccessor.java | 111 +++++++------------ .../metrics/timeline/aggregators/Function.java | 75 ++++++++++--- .../aggregators/TimelineMetricReadHelper.java | 38 +++++++ .../TimelineMetricMetadataManager.java | 5 +- .../metrics/timeline/FunctionTest.java | 10 +- .../timeline/HBaseTimelineMetricStoreTest.java | 31 +++++- 16 files changed, 233 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index 6d7c55f..9173889 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -37,6 +37,7 @@ public abstract class AbstractTimelineMetricsSink { public static final String COLLECTOR_HOST_PROPERTY = "collector"; public static final String COLLECTOR_PORT_PROPERTY = "port"; public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10; + public static final String SKIP_COUNTER_TRANSFROMATION = "skipCounterDerivative"; protected final Log LOG; @@ -60,8 +61,7 @@ public abstract class AbstractTimelineMetricsSink { try { String jsonData = mapper.writeValueAsString(metrics); - HttpURLConnection connection = - (HttpURLConnection) new URL(connectUrl).openConnection(); + HttpURLConnection connection = (HttpURLConnection) new URL(connectUrl).openConnection(); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/json"); http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java index e4dc423..98f4978 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java @@ -42,6 +42,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> { private long timestamp; private long startTime; private String type; + private String units; private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); // default @@ -53,6 +54,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> { public TimelineMetric(TimelineMetric metric) { setMetricName(metric.getMetricName()); setType(metric.getType()); + setUnits(metric.getUnits()); setTimestamp(metric.getTimestamp()); setAppId(metric.getAppId()); setInstanceId(metric.getInstanceId()); @@ -115,7 +117,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> { this.startTime = startTime; } - @XmlElement(name = "type") + @XmlElement(name = "type", defaultValue = "UNDEFINED") public String getType() { return type; } @@ -124,6 +126,15 @@ public class TimelineMetric implements Comparable<TimelineMetric> { this.type = type; } + @XmlElement(name = "units") + public String getUnits() { + return units; + } + + public void setUnits(String units) { + this.units = units; + } + @XmlElement(name = "metrics") public TreeMap<Long, Double> getMetricValues() { return metricValues; http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java index 0624f9c..1f413a0 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java @@ -33,16 +33,17 @@ public class TimelineMetricMetadata { private String metricName; private String appId; private String units; - private MetricType type = MetricType.UNDEFINED; + private String type = "UNDEFINED"; private Long seriesStartTime; boolean supportsAggregates = true; // Serialization ignored helper flag boolean isPersisted = false; + // Placeholder to add more type later public enum MetricType { - GAUGE, // Can vary in both directions - COUNTER, // Single dimension - UNDEFINED // Default + GAUGE, + COUNTER, + UNDEFINED } // Default constructor @@ -50,7 +51,7 @@ public class TimelineMetricMetadata { } public TimelineMetricMetadata(String metricName, String appId, String units, - MetricType type, Long seriesStartTime, + String type, Long seriesStartTime, boolean supportsAggregates) { this.metricName = metricName; this.appId = appId; @@ -89,11 +90,11 @@ public class TimelineMetricMetadata { } @XmlElement(name = "type") - public MetricType getType() { + public String getType() { return type; } - public void setType(MetricType type) { + public void setType(String type) { this.type = type; } http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java index 4e9e36e..15bd5f4 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java @@ -38,11 +38,18 @@ public class TimelineMetricsCache { public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min private final int maxRecsPerName; private final int maxEvictionTimeInMillis; + private boolean skipCounterTransform = true; private final Map<String, Double> counterMetricLastValue = new HashMap<String, Double>(); public TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) { + this(maxRecsPerName, maxEvictionTimeInMillis, false); + } + + public TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis, + boolean skipCounterTransform) { this.maxRecsPerName = maxRecsPerName; this.maxEvictionTimeInMillis = maxEvictionTimeInMillis; + this.skipCounterTransform = skipCounterTransform; } class TimelineMetricWrapper { @@ -171,7 +178,7 @@ public class TimelineMetricsCache { } public void putTimelineMetric(TimelineMetric timelineMetric, boolean isCounter) { - if (isCounter) { + if (isCounter && !skipCounterTransform) { transformMetricValuesToDerivative(timelineMetric); } putTimelineMetric(timelineMetric); http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java index 0257ada..cf2b4ae 100644 --- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java @@ -135,12 +135,11 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem public void run() { LOG.debug("Collecting Metrics for Flume"); try { - Map<String, Map<String, String>> metricsMap = - JMXPollUtil.getAllMBeans(); + Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans(); long currentTimeMillis = System.currentTimeMillis(); for (String component : metricsMap.keySet()) { Map<String, String> attributeMap = metricsMap.get(component); - LOG.info("Attributes for component " + component); + LOG.debug("Attributes for component " + component); processComponentAttributes(currentTimeMillis, component, attributeMap); } } catch (UnableToConnectException uce) { @@ -188,8 +187,6 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem timelineMetric.setInstanceId(component); timelineMetric.setAppId("FLUME_HANDLER"); timelineMetric.setStartTime(currentTimeMillis); - timelineMetric.setType(ClassUtils.getShortCanonicalName( - attributeValue, "Number")); timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue)); return timelineMetric; } http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java index f23dc42..000b82e 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -96,7 +96,9 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT); int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL, TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); // ~ 1 min - metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); + // Skip aggregation of counter values by calculating derivative + metricsCache = new TimelineMetricsCache(maxRowCacheSize, + metricsSendInterval, conf.getBoolean(SKIP_COUNTER_TRANSFROMATION, true)); conf.setListDelimiter(','); Iterator<String> it = (Iterator<String>) conf.getKeys(); @@ -186,7 +188,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple timelineMetric.setHostName(hostName); timelineMetric.setAppId(serviceName); timelineMetric.setStartTime(startTime); - timelineMetric.setType(ClassUtils.getShortCanonicalName(value, "Number")); + timelineMetric.setType(metric.type() != null ? metric.type().name() : null); timelineMetric.getMetricValues().put(startTime, value.doubleValue()); // Put intermediate values into the cache until it is time to send boolean isCounter = MetricType.COUNTER == metric.type(); http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java index a69b7c7..6b23f36 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java @@ -40,6 +40,7 @@ import java.util.List; import org.apache.commons.configuration.SubsetConfiguration; import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricType; import org.apache.hadoop.metrics2.MetricsRecord; import org.easymock.EasyMock; import org.easymock.IAnswer; @@ -90,6 +91,7 @@ public class HadoopTimelineMetricsSinkTest { AbstractMetric metric = createNiceMock(AbstractMetric.class); expect(metric.name()).andReturn("metricName").anyTimes(); expect(metric.value()).andReturn(9.5687).anyTimes(); + expect(metric.type()).andReturn(MetricType.COUNTER).anyTimes(); //TODO currently only numeric metrics are supported MetricsRecord record = createNiceMock(MetricsRecord.class); @@ -104,7 +106,6 @@ public class HadoopTimelineMetricsSinkTest { expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes(); - replay(conf, record, metric); sink.init(conf); @@ -239,4 +240,6 @@ public class HadoopTimelineMetricsSinkTest { Assert.assertEquals(new Double(5.0), values.next()); Assert.assertEquals(new Double(6.0), values.next()); } + + } http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index ff2db1d..4915435 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -42,7 +42,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; - import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -51,7 +50,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; - +import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.MetricType; import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS; import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT; @@ -280,7 +279,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink String[] metricNames = cacheKafkaMetered(currentTimeMillis, sanitizedName, meter); - populateMetricsList(context, metricNames); + populateMetricsList(context, MetricType.GAUGE, metricNames); } @Override @@ -291,7 +290,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink final String metricCountName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, COUNT_SUFIX, counter.count()); - populateMetricsList(context, metricCountName); + populateMetricsList(context, MetricType.COUNTER, metricCountName); } @Override @@ -305,7 +304,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink String[] metricNames = (String[]) ArrayUtils.addAll(metricHNames, metricSNames); - populateMetricsList(context, metricNames); + populateMetricsList(context, MetricType.GAUGE, metricNames); } @Override @@ -321,7 +320,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink String[] metricNames = (String[]) ArrayUtils.addAll(metricMNames, metricTNames); metricNames = (String[]) ArrayUtils.addAll(metricNames, metricSNames); - populateMetricsList(context, metricNames); + populateMetricsList(context, MetricType.GAUGE, metricNames); } @Override @@ -331,7 +330,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, "", Double.parseDouble(String.valueOf(gauge.value()))); - populateMetricsList(context, sanitizedName); + populateMetricsList(context, MetricType.GAUGE, sanitizedName); } private String[] cacheKafkaMetered(long currentTimeMillis, String sanitizedName, Metered meter) { @@ -393,10 +392,11 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink return meterName; } - private void populateMetricsList(Context context, String... metricNames) { + private void populateMetricsList(Context context, MetricType type, String... metricNames) { for (String metricName : metricNames) { TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName); if (cachedMetric != null) { + cachedMetric.setType(type.name()); context.getTimelineMetricList().add(cachedMetric); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java index 73e3de8..f054f16 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java @@ -153,8 +153,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink timelineMetric.setHostName(hostname); timelineMetric.setAppId(component); timelineMetric.setStartTime(currentTimeMillis); - timelineMetric.setType(ClassUtils.getShortCanonicalName( - attributeValue, "Number")); timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue)); return timelineMetric; } http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index c30a354..5ee8b44 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -202,16 +202,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin for (TimelineMetric metric : metricsList){ String name = metric.getMetricName(); if (name.contains("._rate")){ - updateValueAsRate(metric.getMetricValues()); + updateValuesAsRate(metric.getMetricValues()); } } return metrics; } - private Map<Long, Double> updateValueAsRate(Map<Long, Double> metricValues) { + static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues) { Long prevTime = null; + Double prevVal = null; long step; + Double diff; for (Map.Entry<Long, Double> timeValueEntry : metricValues.entrySet()) { Long currTime = timeValueEntry.getKey(); @@ -219,21 +221,22 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin if (prevTime != null) { step = currTime - prevTime; - Double rate = currVal / TimeUnit.MILLISECONDS.toSeconds(step); + diff = currVal - prevVal; + Double rate = diff / TimeUnit.MILLISECONDS.toSeconds(step); timeValueEntry.setValue(rate); } else { timeValueEntry.setValue(0.0); } prevTime = currTime; + prevVal = currVal; } return metricValues; } - public static HashMap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) { - HashMap<String, List<Function>> metricsFunctions = new HashMap<String, - List<Function>>(); + static HashMap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) { + HashMap<String, List<Function>> metricsFunctions = new HashMap<>(); for (String metricName : metricNames){ Function function = Function.DEFAULT_VALUE_FUNCTION; @@ -242,7 +245,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin try { function = Function.fromMetricName(metricName); int functionStartIndex = metricName.indexOf("._"); - if(functionStartIndex > 0 ) { + if (functionStartIndex > 0) { cleanMetricName = metricName.substring(0, functionStartIndex); } } catch (Function.FunctionFormatException ffe){ @@ -252,7 +255,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin List<Function> functionsList = metricsFunctions.get(cleanMetricName); if (functionsList == null) { - functionsList = new ArrayList<Function>(1); + functionsList = new ArrayList<>(1); } functionsList.add(function); metricsFunctions.put(cleanMetricName, functionsList); http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 980c4af..4149e8d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; -import com.google.common.base.Enums; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,7 +65,6 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.*; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL; @@ -166,51 +164,12 @@ public class PhoenixHBaseAccessor { } private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs) - throws SQLException, IOException { + throws SQLException, IOException { TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricCommonsFromResultSet(rs); metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS"))); return metric; } - public static SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet( - ResultSet rs, Function f) throws SQLException, IOException { - - SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric( - rs.getString("METRIC_NAME") + f.getSuffix(), - rs.getString("APP_ID"), - rs.getString("INSTANCE_ID"), - rs.getString("HOSTNAME"), - rs.getLong("SERVER_TIME"), - rs.getLong("SERVER_TIME"), - rs.getString("UNITS") - ); - - // get functions for metricnames - - double value; - switch(f.getReadFunction()){ - case AVG: - value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); - break; - case MIN: - value = rs.getDouble("METRIC_MIN"); - break; - case MAX: - value = rs.getDouble("METRIC_MAX"); - break; - case SUM: - value = rs.getDouble("METRIC_SUM"); - break; - default: - value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); - break; - } - - metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value); - - return metric; - } - private static TreeMap<Long, Double> readLastMetricValueFromJSON(String json) throws IOException { TreeMap<Long, Double> values = readMetricFromJSON(json); @@ -436,7 +395,7 @@ public class PhoenixHBaseAccessor { metricRecordStmt.setString(4, metric.getInstanceId()); metricRecordStmt.setLong(5, currentTime); metricRecordStmt.setLong(6, metric.getStartTime()); - metricRecordStmt.setString(7, metric.getType()); + metricRecordStmt.setString(7, metric.getUnits()); metricRecordStmt.setDouble(8, aggregates[0]); metricRecordStmt.setDouble(9, aggregates[1]); metricRecordStmt.setDouble(10, aggregates[2]); @@ -498,7 +457,7 @@ public class PhoenixHBaseAccessor { try { //get latest - if(condition.isPointInTime()){ + if (condition.isPointInTime()){ getLatestMetricRecords(condition, conn, metrics); } else { if (condition.getEndTime() >= condition.getStartTime()) { @@ -580,19 +539,24 @@ public class PhoenixHBaseAccessor { return metrics; } - private void appendMetricFromResultSet( - TimelineMetrics metrics, Condition condition, Map<String, - List<Function>> metricFunctions, ResultSet rs) - throws SQLException, IOException { - if (condition.getPrecision() == Precision.HOURS - || condition.getPrecision() == Precision.MINUTES - || condition.getPrecision() == Precision.DAYS) { - - String metricName = rs.getString("METRIC_NAME"); - List<Function> functions = metricFunctions.get(metricName); + /** + * Apply aggregate function to the result if supplied else get precision + * or aggregate data with default function applied. + */ + private void appendMetricFromResultSet(TimelineMetrics metrics, Condition condition, + Map<String, List<Function>> metricFunctions, + ResultSet rs) throws SQLException, IOException { + String metricName = rs.getString("METRIC_NAME"); + List<Function> functions = metricFunctions.get(metricName); + // Apply aggregation function if present + if (functions != null && !functions.isEmpty()) { + if (functions.size() > 1) { + throw new IllegalArgumentException("Multiple aggregate functions not supported."); + } for (Function f : functions) { - SingleValuedTimelineMetric metric = getAggregatedTimelineMetricFromResultSet(rs, f); + SingleValuedTimelineMetric metric = + TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f); if (condition.isGrouped()) { metrics.addOrMergeTimelineMetric(metric); @@ -600,28 +564,35 @@ public class PhoenixHBaseAccessor { metrics.getMetrics().add(metric.getTimelineMetric()); } } - } - else { - TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs); + } else { + // No aggregation requested + if (condition.getPrecision().equals(Precision.SECONDS)) { + TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs); + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric); + } - if (condition.isGrouped()) { - metrics.addOrMergeTimelineMetric(metric); } else { - metrics.getMetrics().add(metric); + SingleValuedTimelineMetric metric = + TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, + Function.DEFAULT_VALUE_FUNCTION); + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric.getTimelineMetric()); + } } } } - private void getLatestMetricRecords( - Condition condition, Connection conn, TimelineMetrics metrics) - throws SQLException, IOException { + private void getLatestMetricRecords(Condition condition, Connection conn, + TimelineMetrics metrics) throws SQLException, IOException { validateConditionIsNotEmpty(condition); - PreparedStatement stmt; - - stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn, - condition); + PreparedStatement stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn, condition); ResultSet rs = null; try { rs = stmt.executeQuery(); @@ -1146,7 +1117,7 @@ public class PhoenixHBaseAccessor { stmt.setString(1, metadata.getMetricName()); stmt.setString(2, metadata.getAppId()); stmt.setString(3, metadata.getUnits()); - stmt.setString(4, metadata.getType().name()); + stmt.setString(4, metadata.getType()); stmt.setLong(5, metadata.getSeriesStartTime()); stmt.setBoolean(6, metadata.isSupportsAggregates()); @@ -1239,7 +1210,7 @@ public class PhoenixHBaseAccessor { metricName, appId, rs.getString("UNITS"), - Enums.getIfPresent(MetricType.class, rs.getString("TYPE")).or(MetricType.UNDEFINED), + rs.getString("TYPE"), rs.getLong("START_TIME"), rs.getBoolean("SUPPORTS_AGGREGATION") ); http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java index 8292657..6f408a5 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; +import java.util.Arrays; + /** * Is used to determine metrics aggregate table. * @@ -24,8 +26,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetrics */ public class Function { - public static Function DEFAULT_VALUE_FUNCTION = - new Function(ReadFunction.VALUE, null); + public static Function DEFAULT_VALUE_FUNCTION = new Function(ReadFunction.VALUE, null); private static final String SUFFIX_SEPARATOR = "\\._"; private ReadFunction readFunction = ReadFunction.VALUE; @@ -42,7 +43,13 @@ public class Function { this.postProcessingFunction = ppFunction; } - public static Function fromMetricName(String metricName){ + /** + * Segregate post processing function eg: rate from aggregate function, + * example: avg, in any order + * @param metricName metric name from request + * @return @Function + */ + public static Function fromMetricName(String metricName) { // gets postprocessing, and aggregation function // ex. Metric._rate._avg String[] parts = metricName.split(SUFFIX_SEPARATOR); @@ -50,14 +57,31 @@ public class Function { ReadFunction readFunction = ReadFunction.VALUE; PostProcessingFunction ppFunction = null; - if (parts.length == 3) { - ppFunction = PostProcessingFunction.getFunction(parts[1]); - readFunction = ReadFunction.getFunction(parts[2]); - } else if (parts.length == 2) { - ppFunction = null; - readFunction = ReadFunction.getFunction(parts[1]); + if (parts.length <= 1) { + return new Function(readFunction, null); + } + if (parts.length > 3) { + throw new IllegalArgumentException("Invalid number of functions specified."); + } + + // Parse functions + boolean isSuccessful = false; // Best effort + for (String part : parts) { + if (ReadFunction.isPresent(part)) { + readFunction = ReadFunction.getFunction(part); + isSuccessful = true; + } + if (PostProcessingFunction.isPresent(part)) { + ppFunction = PostProcessingFunction.getFunction(part); + isSuccessful = true; } + } + // Throw exception if parsing failed + if (!isSuccessful) { + throw new FunctionFormatException("Could not parse provided functions: " + + "" + Arrays.asList(parts)); + } return new Function(readFunction, ppFunction); } @@ -113,8 +137,16 @@ public class Function { return suffix; } - public static PostProcessingFunction getFunction(String functionName) throws - FunctionFormatException { + public static boolean isPresent(String functionName) { + try { + PostProcessingFunction.valueOf(functionName.toUpperCase()); + } catch (IllegalArgumentException e) { + return false; + } + return true; + } + + public static PostProcessingFunction getFunction(String functionName) throws FunctionFormatException { if (functionName == null) { return NONE; } @@ -122,8 +154,7 @@ public class Function { try { return PostProcessingFunction.valueOf(functionName.toUpperCase()); } catch (IllegalArgumentException e) { - throw new FunctionFormatException("Function should be value, avg, min, " + - "max", e); + throw new FunctionFormatException("Function should be ._rate", e); } } } @@ -145,8 +176,16 @@ public class Function { return suffix; } - public static ReadFunction getFunction(String functionName) throws - FunctionFormatException { + public static boolean isPresent(String functionName) { + try { + ReadFunction.valueOf(functionName.toUpperCase()); + } catch (IllegalArgumentException e) { + return false; + } + return true; + } + + public static ReadFunction getFunction(String functionName) throws FunctionFormatException { if (functionName == null) { return VALUE; } @@ -154,12 +193,16 @@ public class Function { return ReadFunction.valueOf(functionName.toUpperCase()); } catch (IllegalArgumentException e) { throw new FunctionFormatException( - "Function should be value, avg, min, max. Got " + functionName, e); + "Function should be sum, avg, min, max. Got " + functionName, e); } } } public static class FunctionFormatException extends IllegalArgumentException { + public FunctionFormatException(String message) { + super(message); + } + public FunctionFormatException(String message, Throwable cause) { super(message, cause); } http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java index c5e60fe..846ae92 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; +import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; @@ -46,6 +47,43 @@ public class TimelineMetricReadHelper { return metric; } + public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(ResultSet rs, + Function f) throws SQLException, IOException { + + SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric( + rs.getString("METRIC_NAME") + f.getSuffix(), + rs.getString("APP_ID"), + rs.getString("INSTANCE_ID"), + rs.getString("HOSTNAME"), + rs.getLong("SERVER_TIME"), + rs.getLong("SERVER_TIME"), + rs.getString("UNITS") + ); + + double value; + switch(f.getReadFunction()){ + case AVG: + value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); + break; + case MIN: + value = rs.getDouble("METRIC_MIN"); + break; + case MAX: + value = rs.getDouble("METRIC_MAX"); + break; + case SUM: + value = rs.getDouble("METRIC_SUM"); + break; + default: + value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); + break; + } + + metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value); + + return metric; + } + /** * Returns common part of timeline metrics record without the values. */ http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java index 1c1a1dc..8e58203 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java @@ -36,7 +36,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.MetricType.UNDEFINED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY; @@ -166,8 +165,8 @@ public class TimelineMetricMetadataManager { return new TimelineMetricMetadata( timelineMetric.getMetricName(), timelineMetric.getAppId(), - timelineMetric.getType(), // Present type and unit are synonyms - UNDEFINED, // TODO: Add support for types in the application + timelineMetric.getUnits(), + timelineMetric.getType(), timelineMetric.getStartTime(), true ); http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java index af9c6bb..46bc6f8 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; +import org.junit.Ignore; import org.junit.Test; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.fromMetricName; @@ -32,17 +33,20 @@ public class FunctionTest { Function f = fromMetricName("Metric._avg"); assertThat(f).isEqualTo(new Function(AVG, null)); - f = fromMetricName("Metric._rate._avg"); assertThat(f).isEqualTo(new Function(AVG, RATE)); f = fromMetricName("bytes_in"); assertThat(f).isEqualTo(Function.DEFAULT_VALUE_FUNCTION); - } + // Rate support without aggregates + f = fromMetricName("Metric._rate"); + assertThat(f).isEqualTo(new Function(null, RATE)); + } + @Ignore // If unknown function: behavior is best effort query without function @Test(expected = Function.FunctionFormatException.class) public void testNotAFunction() throws Exception { - Function f = fromMetricName("bytes._not._afunction"); + fromMetricName("bytes._not._afunction"); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/58b91c84/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java index 8233b3f..512a7db 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java @@ -17,12 +17,15 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import junit.framework.Assert; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE; @@ -32,8 +35,7 @@ public class HBaseTimelineMetricStoreTest { public static final String MEM_METRIC = "mem"; public static final String BYTES_IN_METRIC = "bytes_in"; - public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not" + - "._afunction"; + public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction"; @Test public void testParseMetricNamesToAggregationFunctions() throws Exception { @@ -45,8 +47,8 @@ public class HBaseTimelineMetricStoreTest { BYTES_NOT_AFUNCTION_METRIC); //when - HashMap<String, List<Function>> mfm = HBaseTimelineMetricStore - .parseMetricNamesToAggregationFunctions(metricNames); + HashMap<String, List<Function>> mfm = + HBaseTimelineMetricStore.parseMetricNamesToAggregationFunctions(metricNames); //then assertThat(mfm).hasSize(3) @@ -63,4 +65,25 @@ public class HBaseTimelineMetricStoreTest { .contains(Function.DEFAULT_VALUE_FUNCTION); } + + @Test + public void testRateCalculationOnMetricsWithEqualValues() throws Exception { + Map<Long, Double> metricValues = new TreeMap<>(); + metricValues.put(1454016368371L, 1011.25); + metricValues.put(1454016428371L, 1011.25); + metricValues.put(1454016488371L, 1011.25); + metricValues.put(1454016548371L, 1011.25); + metricValues.put(1454016608371L, 1011.25); + metricValues.put(1454016668371L, 1011.25); + metricValues.put(1454016728371L, 1011.25); + + // Calculate rate + Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues)); + + // Make sure rate is zero + for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) { + Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey() + + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue()); + } + } }