AMBARI-19858 : Add nodeCount metric in AMS. (avijayan,swagle)

(cherry picked from commit e5a7f2a50cd3a590fe1c8638b09c2fb34ef6f47c)

Change-Id: I253e2f3630379b3bc792b7215f389134564aabbf


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8cfbd7b4
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8cfbd7b4
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8cfbd7b4

Branch: refs/heads/branch-feature-BUG-74026
Commit: 8cfbd7b462f66c785ae2dbb4abb1301e2e5cd16d
Parents: 0446e4b
Author: Aravindan Vijayan <avija...@hortonworks.com>
Authored: Tue Feb 7 09:58:30 2017 -0800
Committer: Zuul <rel...@hortonworks.com>
Committed: Tue Feb 7 15:48:09 2017 -0800

----------------------------------------------------------------------
 .../metrics/timeline/PhoenixHBaseAccessor.java  |   6 +-
 .../TimelineMetricAppAggregator.java            |   4 +-
 .../TimelineMetricClusterAggregatorSecond.java  |  90 +++++++++------
 .../aggregators/TimelineMetricReadHelper.java   |   3 +-
 ...melineMetricClusterAggregatorSecondTest.java | 114 +++++++++++++++++--
 5 files changed, 162 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/8cfbd7b4/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 8d567ce..ad05025 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -153,9 +153,9 @@ public class PhoenixHBaseAccessor {
   private static final int POINTS_PER_MINUTE = 6;
   public static int RESULTSET_LIMIT = (int)TimeUnit.HOURS.toMinutes(2) * 
METRICS_PER_MINUTE * POINTS_PER_MINUTE ;
 
-  private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = 
new TimelineMetricReadHelper();
-  private static ObjectMapper mapper = new ObjectMapper();
-  private static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = 
new TypeReference<TreeMap<Long, Double>>() {};
+  static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new 
TimelineMetricReadHelper();
+  static ObjectMapper mapper = new ObjectMapper();
+  static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new 
TypeReference<TreeMap<Long, Double>>() {};
 
   private final Configuration hbaseConf;
   private final Configuration metricsConf;

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cfbd7b4/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
index d7b0d55..44aca03 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -48,7 +48,7 @@ public class TimelineMetricAppAggregator {
   // Lookup to check candidacy of an app
   private final List<String> appIdsToAggregate;
   private final Map<String, Set<String>> hostedAppsMap;
-  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics;
+  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = 
new HashMap<>();
   TimelineMetricMetadataManager metadataManagerInstance;
 
   public TimelineMetricAppAggregator(TimelineMetricMetadataManager 
metadataManager,
@@ -64,7 +64,7 @@ public class TimelineMetricAppAggregator {
    */
   public void init() {
     LOG.debug("Initializing aggregation cycle.");
-    aggregateClusterMetrics = new HashMap<TimelineClusterMetric, 
MetricClusterAggregate>();
+    aggregateClusterMetrics = new HashMap<>();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cfbd7b4/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 6f3d8bc..6683c0d 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -18,8 +18,25 @@
 package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -31,29 +48,13 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
 
 /**
  * Aggregates a metric across all hosts in the cluster. Reads metrics from
  * the precision table and saves into the aggregate.
  */
 public class TimelineMetricClusterAggregatorSecond extends 
AbstractTimelineAggregator {
-  public Long timeSliceIntervalMillis;
+  Long timeSliceIntervalMillis;
   private TimelineMetricReadHelper timelineMetricReadHelper = new 
TimelineMetricReadHelper(true);
   // Aggregator to perform app-level aggregates for host metrics
   private final TimelineMetricAppAggregator appAggregator;
@@ -136,7 +137,7 @@ public class TimelineMetricClusterAggregatorSecond extends 
AbstractTimelineAggre
   /**
    * Return time slices to normalize the timeseries data.
    */
-  protected List<Long[]> getTimeSlices(long startTime, long endTime) {
+  List<Long[]> getTimeSlices(long startTime, long endTime) {
     List<Long[]> timeSlices = new ArrayList<Long[]>();
     long sliceStartTime = startTime;
     while (sliceStartTime < endTime) {
@@ -146,13 +147,13 @@ public class TimelineMetricClusterAggregatorSecond 
extends AbstractTimelineAggre
     return timeSlices;
   }
 
-  private Map<TimelineClusterMetric, MetricClusterAggregate> 
aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
-    throws SQLException, IOException {
+  Map<TimelineClusterMetric, MetricClusterAggregate> 
aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+      throws SQLException, IOException {
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics 
=
       new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-    int numLiveHosts = 0;
 
     TimelineMetric metric = null;
+    Map<String, MutableInt> hostedAppCounter = new HashMap<>();
     if (rs.next()) {
       metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
 
@@ -167,7 +168,14 @@ public class TimelineMetricClusterAggregatorSecond extends 
AbstractTimelineAggre
         } else {
           // Process the current metric
           int numHosts = 
processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
-          numLiveHosts = Math.max(numHosts, numLiveHosts);
+          if (!hostedAppCounter.containsKey(metric.getAppId())) {
+            hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+          } else {
+            int currentHostCount = 
hostedAppCounter.get(metric.getAppId()).intValue();
+            if (currentHostCount < numHosts) {
+              hostedAppCounter.put(metric.getAppId(), new 
MutableInt(numHosts));
+            }
+          }
           metric = nextMetric;
         }
       }
@@ -175,15 +183,22 @@ public class TimelineMetricClusterAggregatorSecond 
extends AbstractTimelineAggre
     // Process last metric
     if (metric != null) {
       int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, 
metric, timeSlices);
-      numLiveHosts = Math.max(numHosts, numLiveHosts);
+      if (!hostedAppCounter.containsKey(metric.getAppId())) {
+        hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+      } else {
+        int currentHostCount = 
hostedAppCounter.get(metric.getAppId()).intValue();
+        if (currentHostCount < numHosts) {
+          hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+        }
+      }
     }
 
     // Add app level aggregates to save
     aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
 
-    // Add liveHosts metric.
+    // Add liveHosts per AppId metrics.
     long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
-    processLiveHostsMetric(aggregateClusterMetrics, numLiveHosts, timestamp);
+    processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, 
timestamp);
 
     return aggregateClusterMetrics;
   }
@@ -196,7 +211,6 @@ public class TimelineMetricClusterAggregatorSecond extends 
AbstractTimelineAggre
   protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, 
MetricClusterAggregate> aggregateClusterMetrics,
                                               TimelineMetric metric, 
List<Long[]> timeSlices) {
     // Create time slices
-
     TimelineMetricMetadataKey appKey =  new 
TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId());
     TimelineMetricMetadata metricMetadata = 
metadataManagerInstance.getMetadataCacheValue(appKey);
 
@@ -209,8 +223,7 @@ public class TimelineMetricClusterAggregatorSecond extends 
AbstractTimelineAggre
     int numHosts = 0;
 
     if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
-      for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
-        clusterMetrics.entrySet()) {
+      for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : 
clusterMetrics.entrySet()) {
 
         TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
         Double avgValue = clusterMetricEntry.getValue();
@@ -415,16 +428,21 @@ public class TimelineMetricClusterAggregatorSecond 
extends AbstractTimelineAggre
     return -1l;
   }
 
-  private void processLiveHostsMetric(Map<TimelineClusterMetric, 
MetricClusterAggregate> aggregateClusterMetrics,
-                                     int numLiveHosts, long timestamp) {
+  /* Add cluster metric for number of hosts that are hosting an appId */
+  private void processLiveAppCountMetrics(Map<TimelineClusterMetric, 
MetricClusterAggregate> aggregateClusterMetrics,
+      Map<String, MutableInt> appHostsCount, long timestamp) {
 
-    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
-      "live_hosts", HOST_APP_ID, null, timestamp, null);
+    for (Map.Entry<String, MutableInt> appHostsEntry : 
appHostsCount.entrySet()) {
+      TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
+        "live_hosts", appHostsEntry.getKey(), null, timestamp, null);
 
-    MetricClusterAggregate metricClusterAggregate = new 
MetricClusterAggregate((double) numLiveHosts,
-      1, null, (double) numLiveHosts, (double) numLiveHosts);
+      Integer numOfHosts = appHostsEntry.getValue().intValue();
 
-    aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
-  }
+      MetricClusterAggregate metricClusterAggregate = new 
MetricClusterAggregate(
+        (double) numOfHosts, 1, null, (double) numOfHosts, (double) 
numOfHosts);
 
+      aggregateClusterMetrics.put(timelineClusterMetric, 
metricClusterAggregate);
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cfbd7b4/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index 7a74e24..b5f49fb 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -41,8 +41,7 @@ public class TimelineMetricReadHelper {
   public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
       throws SQLException, IOException {
     TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
-    TreeMap<Long, Double> sortedByTimeMetrics =
-      PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"));
+    TreeMap<Long, Double> sortedByTimeMetrics = 
PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"));
     metric.setMetricValues(sortedByTimeMetrics);
     return metric;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8cfbd7b4/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index 58d908a..2297036 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -17,19 +17,26 @@
  */
 package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
-import junit.framework.Assert;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import org.easymock.EasyMock;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+
+import junit.framework.Assert;
 
 public class TimelineMetricClusterAggregatorSecondTest {
 
@@ -41,7 +48,7 @@ public class TimelineMetricClusterAggregatorSecondTest {
     long metricInterval = 10000l;
 
     Configuration configuration = new Configuration();
-    TimelineMetricMetadataManager metricMetadataManagerMock = 
EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+    TimelineMetricMetadataManager metricMetadataManagerMock = 
createNiceMock(TimelineMetricMetadataManager.class);
 
     TimelineMetricClusterAggregatorSecond secondAggregator = new 
TimelineMetricClusterAggregatorSecond(
       METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null,
@@ -113,11 +120,11 @@ public class TimelineMetricClusterAggregatorSecondTest {
     long sliceInterval = 30000l;
 
     Configuration configuration = new Configuration();
-    TimelineMetricMetadataManager metricMetadataManagerMock = 
EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+    TimelineMetricMetadataManager metricMetadataManagerMock = 
createNiceMock(TimelineMetricMetadataManager.class);
 
-    
EasyMock.expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey)EasyMock.anyObject()))
+    
expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey)
 anyObject()))
       .andReturn(null).anyTimes();
-    EasyMock.replay(metricMetadataManagerMock);
+    replay(metricMetadataManagerMock);
 
     TimelineMetricClusterAggregatorSecond secondAggregator = new 
TimelineMetricClusterAggregatorSecond(
       METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, 
null,
@@ -312,4 +319,87 @@ public class TimelineMetricClusterAggregatorSecondTest {
 
   }
 
+  @Test
+  public void testLiveHostCounterMetrics() throws Exception {
+    long aggregatorInterval = 120000;
+    long sliceInterval = 30000;
+
+    Configuration configuration = new Configuration();
+    TimelineMetricMetadataManager metricMetadataManagerMock = 
createNiceMock(TimelineMetricMetadataManager.class);
+
+    
expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey)
 anyObject())).andReturn(null).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    TimelineMetricClusterAggregatorSecond secondAggregator = new 
TimelineMetricClusterAggregatorSecond(
+      METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, 
null,
+      aggregatorInterval, 2, "false", "", "", aggregatorInterval,
+      sliceInterval, null);
+
+    long now = System.currentTimeMillis();
+    long startTime = now - 120000;
+    long seconds = 1000;
+    List<Long[]> slices = secondAggregator.getTimeSlices(startTime, now);
+    ResultSet rs = createNiceMock(ResultSet.class);
+
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
+    metricValues.put(startTime + 15*seconds, 1.0);
+    metricValues.put(startTime + 45*seconds, 2.0);
+    metricValues.put(startTime + 75*seconds, 3.0);
+    metricValues.put(startTime + 105*seconds, 4.0);
+
+    expect(rs.next()).andReturn(true).times(6);
+    expect(rs.next()).andReturn(false);
+
+    /*
+    m1-h1-a1
+    m2-h1-a1
+    m2-h1-a2
+    m2-h2-a1
+    m2-h2-a2
+    m2-h3-a2
+
+    So live_hosts : a1 = 2
+       live_hosts : a2 = 3
+     */
+    expect(rs.getString("METRIC_NAME")).andReturn("m1").times(1);
+    expect(rs.getString("METRIC_NAME")).andReturn("m2").times(5);
+
+    expect(rs.getString("HOSTNAME")).andReturn("h1").times(3);
+    expect(rs.getString("HOSTNAME")).andReturn("h2").times(2);
+    expect(rs.getString("HOSTNAME")).andReturn("h3").times(1);
+
+    expect(rs.getString("APP_ID")).andReturn("a1").times(2);
+    expect(rs.getString("APP_ID")).andReturn("a2").times(1);
+    expect(rs.getString("APP_ID")).andReturn("a1").times(1);
+    expect(rs.getString("APP_ID")).andReturn("a2").times(2);
+
+    expect(rs.getLong("SERVER_TIME")).andReturn(now - 150000).times(6);
+    expect(rs.getLong("START_TIME")).andReturn(now - 150000).times(6);
+    expect(rs.getString("UNITS")).andReturn(null).times(6);
+
+    ObjectMapper mapper = new ObjectMapper();
+    
expect(rs.getString("METRICS")).andReturn(mapper.writeValueAsString(metricValues)).times(6);
+
+    replay(rs);
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = 
secondAggregator.aggregateMetricsFromResultSet(rs, slices);
+
+    Assert.assertNotNull(aggregates);
+
+    MetricClusterAggregate a1 = null, a2 = null;
+
+    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : 
aggregates.entrySet()) {
+      if (m.getKey().getMetricName().equals("live_hosts") && 
m.getKey().getAppId().equals("a1")) {
+        a1 = m.getValue();
+      }
+      if (m.getKey().getMetricName().equals("live_hosts") && 
m.getKey().getAppId().equals("a2")) {
+        a2 = m.getValue();
+      }
+    }
+
+    Assert.assertNotNull(a1);
+    Assert.assertNotNull(a2);
+    Assert.assertEquals(2d, a1.getSum());
+    Assert.assertEquals(3d, a2.getSum());
+  }
 }

Reply via email to