AMBARI-19858 : Add nodeCount metric in AMS. (avijayan,swagle) (cherry picked from commit e5a7f2a50cd3a590fe1c8638b09c2fb34ef6f47c)
Change-Id: I253e2f3630379b3bc792b7215f389134564aabbf Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8cfbd7b4 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8cfbd7b4 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8cfbd7b4 Branch: refs/heads/branch-feature-BUG-74026 Commit: 8cfbd7b462f66c785ae2dbb4abb1301e2e5cd16d Parents: 0446e4b Author: Aravindan Vijayan <avija...@hortonworks.com> Authored: Tue Feb 7 09:58:30 2017 -0800 Committer: Zuul <rel...@hortonworks.com> Committed: Tue Feb 7 15:48:09 2017 -0800 ---------------------------------------------------------------------- .../metrics/timeline/PhoenixHBaseAccessor.java | 6 +- .../TimelineMetricAppAggregator.java | 4 +- .../TimelineMetricClusterAggregatorSecond.java | 90 +++++++++------ .../aggregators/TimelineMetricReadHelper.java | 3 +- ...melineMetricClusterAggregatorSecondTest.java | 114 +++++++++++++++++-- 5 files changed, 162 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/8cfbd7b4/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 8d567ce..ad05025 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -153,9 +153,9 @@ public class PhoenixHBaseAccessor { private static final int POINTS_PER_MINUTE = 6; public static int RESULTSET_LIMIT = (int)TimeUnit.HOURS.toMinutes(2) * METRICS_PER_MINUTE * POINTS_PER_MINUTE ; - private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(); - private static ObjectMapper mapper = new ObjectMapper(); - private static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new TypeReference<TreeMap<Long, Double>>() {}; + static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(); + static ObjectMapper mapper = new ObjectMapper(); + static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new TypeReference<TreeMap<Long, Double>>() {}; private final Configuration hbaseConf; private final Configuration metricsConf; http://git-wip-us.apache.org/repos/asf/ambari/blob/8cfbd7b4/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java index d7b0d55..44aca03 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java @@ -48,7 +48,7 @@ public class TimelineMetricAppAggregator { // Lookup to check candidacy of an app private final List<String> appIdsToAggregate; private final Map<String, Set<String>> hostedAppsMap; - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics; + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>(); TimelineMetricMetadataManager metadataManagerInstance; public TimelineMetricAppAggregator(TimelineMetricMetadataManager metadataManager, @@ -64,7 +64,7 @@ public class TimelineMetricAppAggregator { */ public void init() { LOG.debug("Initializing aggregation cycle."); - aggregateClusterMetrics = new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + aggregateClusterMetrics = new HashMap<>(); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/8cfbd7b4/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index 6f3d8bc..6683c0d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -18,8 +18,25 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -31,29 +48,13 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS; /** * Aggregates a metric across all hosts in the cluster. Reads metrics from * the precision table and saves into the aggregate. */ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator { - public Long timeSliceIntervalMillis; + Long timeSliceIntervalMillis; private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true); // Aggregator to perform app-level aggregates for host metrics private final TimelineMetricAppAggregator appAggregator; @@ -136,7 +137,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre /** * Return time slices to normalize the timeseries data. */ - protected List<Long[]> getTimeSlices(long startTime, long endTime) { + List<Long[]> getTimeSlices(long startTime, long endTime) { List<Long[]> timeSlices = new ArrayList<Long[]>(); long sliceStartTime = startTime; while (sliceStartTime < endTime) { @@ -146,13 +147,13 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre return timeSlices; } - private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices) - throws SQLException, IOException { + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices) + throws SQLException, IOException { Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - int numLiveHosts = 0; TimelineMetric metric = null; + Map<String, MutableInt> hostedAppCounter = new HashMap<>(); if (rs.next()) { metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); @@ -167,7 +168,14 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre } else { // Process the current metric int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); - numLiveHosts = Math.max(numHosts, numLiveHosts); + if (!hostedAppCounter.containsKey(metric.getAppId())) { + hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); + } else { + int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue(); + if (currentHostCount < numHosts) { + hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); + } + } metric = nextMetric; } } @@ -175,15 +183,22 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre // Process last metric if (metric != null) { int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); - numLiveHosts = Math.max(numHosts, numLiveHosts); + if (!hostedAppCounter.containsKey(metric.getAppId())) { + hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); + } else { + int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue(); + if (currentHostCount < numHosts) { + hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); + } + } } // Add app level aggregates to save aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics()); - // Add liveHosts metric. + // Add liveHosts per AppId metrics. long timestamp = timeSlices.get(timeSlices.size() - 1)[1]; - processLiveHostsMetric(aggregateClusterMetrics, numLiveHosts, timestamp); + processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp); return aggregateClusterMetrics; } @@ -196,7 +211,6 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, TimelineMetric metric, List<Long[]> timeSlices) { // Create time slices - TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId()); TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey); @@ -209,8 +223,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre int numHosts = 0; if (clusterMetrics != null && !clusterMetrics.isEmpty()) { - for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : - clusterMetrics.entrySet()) { + for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) { TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); Double avgValue = clusterMetricEntry.getValue(); @@ -415,16 +428,21 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre return -1l; } - private void processLiveHostsMetric(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, - int numLiveHosts, long timestamp) { + /* Add cluster metric for number of hosts that are hosting an appId */ + private void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, + Map<String, MutableInt> appHostsCount, long timestamp) { - TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric( - "live_hosts", HOST_APP_ID, null, timestamp, null); + for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) { + TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric( + "live_hosts", appHostsEntry.getKey(), null, timestamp, null); - MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate((double) numLiveHosts, - 1, null, (double) numLiveHosts, (double) numLiveHosts); + Integer numOfHosts = appHostsEntry.getValue().intValue(); - aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate); - } + MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate( + (double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts); + aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate); + } + + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/8cfbd7b4/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java index 7a74e24..b5f49fb 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java @@ -41,8 +41,7 @@ public class TimelineMetricReadHelper { public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) throws SQLException, IOException { TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs); - TreeMap<Long, Double> sortedByTimeMetrics = - PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS")); + TreeMap<Long, Double> sortedByTimeMetrics = PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS")); metric.setMetricValues(sortedByTimeMetrics); return metric; } http://git-wip-us.apache.org/repos/asf/ambari/blob/8cfbd7b4/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java index 58d908a..2297036 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java @@ -17,19 +17,26 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; -import junit.framework.Assert; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; -import org.easymock.EasyMock; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.Test; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; + +import junit.framework.Assert; public class TimelineMetricClusterAggregatorSecondTest { @@ -41,7 +48,7 @@ public class TimelineMetricClusterAggregatorSecondTest { long metricInterval = 10000l; Configuration configuration = new Configuration(); - TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class); + TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond( METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, @@ -113,11 +120,11 @@ public class TimelineMetricClusterAggregatorSecondTest { long sliceInterval = 30000l; Configuration configuration = new Configuration(); - TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class); + TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); - EasyMock.expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey)EasyMock.anyObject())) + expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())) .andReturn(null).anyTimes(); - EasyMock.replay(metricMetadataManagerMock); + replay(metricMetadataManagerMock); TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond( METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null, @@ -312,4 +319,87 @@ public class TimelineMetricClusterAggregatorSecondTest { } + @Test + public void testLiveHostCounterMetrics() throws Exception { + long aggregatorInterval = 120000; + long sliceInterval = 30000; + + Configuration configuration = new Configuration(); + TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); + + expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes(); + replay(metricMetadataManagerMock); + + TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond( + METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null, + aggregatorInterval, 2, "false", "", "", aggregatorInterval, + sliceInterval, null); + + long now = System.currentTimeMillis(); + long startTime = now - 120000; + long seconds = 1000; + List<Long[]> slices = secondAggregator.getTimeSlices(startTime, now); + ResultSet rs = createNiceMock(ResultSet.class); + + TreeMap<Long, Double> metricValues = new TreeMap<>(); + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 45*seconds, 2.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 105*seconds, 4.0); + + expect(rs.next()).andReturn(true).times(6); + expect(rs.next()).andReturn(false); + + /* + m1-h1-a1 + m2-h1-a1 + m2-h1-a2 + m2-h2-a1 + m2-h2-a2 + m2-h3-a2 + + So live_hosts : a1 = 2 + live_hosts : a2 = 3 + */ + expect(rs.getString("METRIC_NAME")).andReturn("m1").times(1); + expect(rs.getString("METRIC_NAME")).andReturn("m2").times(5); + + expect(rs.getString("HOSTNAME")).andReturn("h1").times(3); + expect(rs.getString("HOSTNAME")).andReturn("h2").times(2); + expect(rs.getString("HOSTNAME")).andReturn("h3").times(1); + + expect(rs.getString("APP_ID")).andReturn("a1").times(2); + expect(rs.getString("APP_ID")).andReturn("a2").times(1); + expect(rs.getString("APP_ID")).andReturn("a1").times(1); + expect(rs.getString("APP_ID")).andReturn("a2").times(2); + + expect(rs.getLong("SERVER_TIME")).andReturn(now - 150000).times(6); + expect(rs.getLong("START_TIME")).andReturn(now - 150000).times(6); + expect(rs.getString("UNITS")).andReturn(null).times(6); + + ObjectMapper mapper = new ObjectMapper(); + expect(rs.getString("METRICS")).andReturn(mapper.writeValueAsString(metricValues)).times(6); + + replay(rs); + + Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromResultSet(rs, slices); + + Assert.assertNotNull(aggregates); + + MetricClusterAggregate a1 = null, a2 = null; + + for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : aggregates.entrySet()) { + if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a1")) { + a1 = m.getValue(); + } + if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a2")) { + a2 = m.getValue(); + } + } + + Assert.assertNotNull(a1); + Assert.assertNotNull(a2); + Assert.assertEquals(2d, a1.getSum()); + Assert.assertEquals(3d, a2.getSum()); + } }