Repository: ambari
Updated Branches:
  refs/heads/branch-2.2 c6af0197b -> 2c7aec608


http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
deleted file mode 100644
index e9c25cf..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
-  private Connection conn;
-  private PhoenixHBaseAccessor hdb;
-
-  @Before
-  public void setUp() throws Exception {
-    
Logger.getLogger("org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline").setLevel(Level.DEBUG);
-    hdb = createTestableHBaseAccessor();
-    // inits connection, starts mini cluster
-    conn = getConnection(getUrl());
-
-    hdb.initMetricSchema();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    Connection conn = getConnection(getUrl());
-    Statement stmt = conn.createStatement();
-
-    stmt.execute("delete from METRIC_AGGREGATE");
-    stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
-    stmt.execute("delete from METRIC_RECORD");
-    stmt.execute("delete from METRIC_RECORD_HOURLY");
-    stmt.execute("delete from METRIC_RECORD_MINUTE");
-    conn.commit();
-
-    stmt.close();
-    conn.close();
-  }
-
-  @Test
-  public void testShouldInsertMetrics() throws Exception {
-    // GIVEN
-
-    // WHEN
-    long startTime = System.currentTimeMillis();
-    TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
-    hdb.insertMetricRecords(metricsSent);
-
-    Condition queryCondition = new DefaultCondition(null,
-        Collections.singletonList("local"), null, null, startTime,
-        startTime + (15 * 60 * 1000), null, null, false);
-    TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition, null);
-
-    // THEN
-    assertThat(recordRead.getMetrics()).hasSize(2)
-      .extracting("metricName")
-      .containsOnly("mem_free", "disk_free");
-
-    assertThat(metricsSent.getMetrics())
-      .usingElementComparator(TIME_IGNORING_COMPARATOR)
-      .containsExactlyElementsOf(recordRead.getMetrics());
-  }
-
-  private Configuration getConfigurationForTest(boolean useGroupByAggregators) 
{
-    Configuration configuration = new Configuration();
-    configuration.set("timeline.metrics.service.use.groupBy.aggregators", 
String.valueOf(useGroupByAggregators));
-    return configuration;
-  }
-
-  @Test
-  public void testShouldAggregateMinuteProperly() throws Exception {
-    // GIVEN
-    TimelineMetricAggregator aggregatorMinute =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
-        getConfigurationForTest(false));
-    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
-
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-
-    // WHEN
-    long endTime = startTime + 1000 * 60 * 4;
-    boolean success = aggregatorMinute.doWork(startTime, endTime);
-
-    //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
-
-    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-    MetricHostAggregate expectedAggregate =
-      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
-
-    int count = 0;
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        readHelper.getTimelineMetricKeyFromResultSet(rs);
-      MetricHostAggregate currentHostAggregate =
-        readHelper.getMetricHostAggregateFromResultSet(rs);
-
-      if ("disk_free".equals(currentMetric.getMetricName())) {
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(0.0, currentHostAggregate.getMin());
-        assertEquals(20, currentHostAggregate.getNumberOfSamples());
-        assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
-        count++;
-      } else if ("mem_free".equals(currentMetric.getMetricName())) {
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(0.0, currentHostAggregate.getMin());
-        assertEquals(20, currentHostAggregate.getNumberOfSamples());
-        assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
-        count++;
-      } else {
-        fail("Unexpected entry");
-      }
-    }
-    assertEquals("Two aggregated entries expected", 2, count);
-  }
-
-  @Test
-   public void testShouldAggregateHourProperly() throws Exception {
-    // GIVEN
-    TimelineMetricAggregator aggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb,
-        getConfigurationForTest(false));
-    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
-    long startTime = System.currentTimeMillis();
-
-    MetricHostAggregate expectedAggregate =
-      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
-    Map<TimelineMetric, MetricHostAggregate>
-      aggMap = new HashMap<TimelineMetric,
-      MetricHostAggregate>();
-
-    int min_5 = 5 * 60 * 1000;
-    long ctime = startTime - min_5;
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-
-    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
-
-    //WHEN
-    long endTime = ctime + min_5;
-    boolean success = aggregator.doWork(startTime, endTime);
-    assertTrue(success);
-
-    //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_AGGREGATE_HOURLY_TABLE_NAME));
-
-    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        readHelper.getTimelineMetricKeyFromResultSet(rs);
-      MetricHostAggregate currentHostAggregate =
-        readHelper.getMetricHostAggregateFromResultSet(rs);
-
-      if ("disk_used".equals(currentMetric.getMetricName())) {
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(0.0, currentHostAggregate.getMin());
-        assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
-        assertEquals(12 * 15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
-      }
-    }
-  }
-
-  @Test
-  public void testMetricAggregateDaily() throws Exception {
-    // GIVEN
-    TimelineMetricAggregator aggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb,
-        getConfigurationForTest(false));
-    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
-    long startTime = System.currentTimeMillis();
-
-    MetricHostAggregate expectedAggregate =
-      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
-    Map<TimelineMetric, MetricHostAggregate>
-      aggMap = new HashMap<TimelineMetric, MetricHostAggregate>();
-
-    int min_5 = 5 * 60 * 1000;
-    long ctime = startTime - min_5;
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-
-    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_HOURLY_TABLE_NAME);
-
-    //WHEN
-    long endTime = ctime + min_5;
-    boolean success = aggregator.doWork(startTime, endTime);
-    assertTrue(success);
-
-    //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_AGGREGATE_DAILY_TABLE_NAME));
-
-    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        readHelper.getTimelineMetricKeyFromResultSet(rs);
-      MetricHostAggregate currentHostAggregate =
-        readHelper.getMetricHostAggregateFromResultSet(rs);
-
-      if ("disk_used".equals(currentMetric.getMetricName())) {
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(0.0, currentHostAggregate.getMin());
-        assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
-        assertEquals(12 * 15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
-      }
-    }
-  }
-
-  @Test
-  public void testAggregationUsingGroupByQuery() throws Exception {
-    // GIVEN
-    TimelineMetricAggregator aggregatorMinute =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
-        getConfigurationForTest(true));
-    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
-
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-
-    long endTime = startTime + 1000 * 60 * 4;
-    boolean success = aggregatorMinute.doWork(startTime, endTime);
-    assertTrue(success);
-
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
-
-    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-    MetricHostAggregate expectedAggregate =
-      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
-
-    int count = 0;
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        readHelper.getTimelineMetricKeyFromResultSet(rs);
-      MetricHostAggregate currentHostAggregate =
-        readHelper.getMetricHostAggregateFromResultSet(rs);
-
-      if ("disk_free".equals(currentMetric.getMetricName())) {
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(0.0, currentHostAggregate.getMin());
-        assertEquals(20, currentHostAggregate.getNumberOfSamples());
-        assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
-        count++;
-      } else if ("mem_free".equals(currentMetric.getMetricName())) {
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(0.0, currentHostAggregate.getMin());
-        assertEquals(20, currentHostAggregate.getNumberOfSamples());
-        assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
-        count++;
-      } else {
-        fail("Unexpected entry");
-      }
-    }
-    assertEquals("Two aggregated entries expected", 2, count);
-  }
-
-  private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
-    new Comparator<TimelineMetric>() {
-      @Override
-      public int compare(TimelineMetric o1, TimelineMetric o2) {
-        return o1.equalsExceptTime(o2) ? 0 : 1;
-      }
-    };
-
-  private TimelineMetrics prepareTimelineMetrics(long startTime, String host) {
-    TimelineMetrics metrics = new TimelineMetrics();
-    metrics.setMetrics(Arrays.asList(
-      createMetric(startTime, "disk_free", host),
-      createMetric(startTime, "mem_free", host)));
-
-    return metrics;
-  }
-
-  private TimelineMetric createMetric(long startTime, String metricName, 
String host) {
-    TimelineMetric m = new TimelineMetric();
-    m.setAppId("host");
-    m.setHostName(host);
-    m.setMetricName(metricName);
-    m.setStartTime(startTime);
-    TreeMap<Long, Double> vals = new TreeMap<Long, Double>();
-    vals.put(startTime + 15000l, 0.0);
-    vals.put(startTime + 30000l, 0.0);
-    vals.put(startTime + 45000l, 1.0);
-    vals.put(startTime + 60000l, 2.0);
-
-    m.setMetricValues(vals);
-
-    return m;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
index 8405b49..1fc5b0e 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
@@ -18,6 +18,8 @@
 package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
 
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.ITClusterAggregator;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.ITMetricAggregator;
 import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
new file mode 100644
index 0000000..2b29469
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.yarn.util.Clock;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicLong;
+import static junit.framework.Assert.assertEquals;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+
+public class AbstractTimelineAggregatorTest {
+
+  private AbstractTimelineAggregator agg;
+  TestClock clock = new TestClock();
+
+  AtomicLong startTimeInDoWork;
+  AtomicLong endTimeInDoWork;
+  AtomicLong checkPoint;
+  int actualRuns;
+
+  long sleepIntervalMillis;
+  int checkpointCutOffMultiplier;
+
+  @Before
+  public void setUp() throws Exception {
+    sleepIntervalMillis = 30000l;
+    checkpointCutOffMultiplier = 2;
+
+    Configuration metricsConf = new Configuration();
+    metricsConf.setInt(AGGREGATOR_CHECKPOINT_DELAY, 0);
+    metricsConf.setInt(RESULTSET_FETCH_SIZE, 2000);
+
+    startTimeInDoWork = new AtomicLong(0);
+    endTimeInDoWork = new AtomicLong(0);
+    checkPoint = new AtomicLong(-1);
+    actualRuns = 0;
+
+    agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, 
metricsConf, clock) {
+      @Override
+      public boolean doWork(long startTime, long endTime) {
+        startTimeInDoWork.set(startTime);
+        endTimeInDoWork.set(endTime);
+        actualRuns++;
+
+        return true;
+      }
+
+      @Override
+      protected Condition
+      prepareMetricQueryCondition(long startTime, long endTime) {
+        return null;
+      }
+
+      @Override
+      protected void aggregate(ResultSet rs, long startTime,
+                               long endTime) throws IOException, SQLException {
+      }
+
+      @Override
+      protected Long getSleepIntervalMillis() {
+        return sleepIntervalMillis;
+      }
+
+      @Override
+      protected Integer getCheckpointCutOffMultiplier() {
+        return checkpointCutOffMultiplier;
+      }
+
+      @Override
+      public boolean isDisabled() {
+        return false;
+      }
+
+      @Override
+      protected String getCheckpointLocation() {
+        return "dummy_ckptFile";
+      }
+
+      protected long readCheckPoint() {
+        return checkPoint.get();
+      }
+
+      @Override
+      protected void saveCheckPoint(long checkpointTime) throws IOException {
+        checkPoint.set(checkpointTime);
+      }
+    };
+
+
+  }
+
+  @Test
+  public void testDoWorkOnZeroDelay() throws Exception {
+
+    // starting at time 0;
+    clock.setTime(0);
+
+    long sleep = agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
+    assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
+    assertEquals(0, checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals("Do not aggregate on first run", 0, actualRuns);
+
+    // exactly one sleepInterval
+    clock.setTime(clock.getTime() + sleepIntervalMillis);
+    sleep = agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime", clock.getTime() -
+        sleepIntervalMillis,
+      startTimeInDoWork.get());
+    assertEquals("endTime", clock.getTime(),
+      endTimeInDoWork.get());
+    assertEquals(clock.getTime(), checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals(1, actualRuns);
+
+    // exactly one sleepInterval
+    clock.setTime(clock.getTime() + sleepIntervalMillis);
+    sleep = agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime", clock.getTime() -
+        sleepIntervalMillis,
+      startTimeInDoWork.get());
+    assertEquals("endTime", clock.getTime(),
+      endTimeInDoWork.get());
+    assertEquals(clock.getTime(), checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals(2, actualRuns);
+
+    // checkpointCutOffMultiplier x sleepInterval - should pass,
+    // it will aggregate only first part of the whole 2x interval
+    // and sleep as usual (don't we need to skip some sleep?)
+    //
+    // effectively checkpoint will be one interval in the past,
+    // so next run will
+    clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
+      sleepIntervalMillis));
+    sleep = agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime after 2xinterval", clock.getTime() -
+        (checkpointCutOffMultiplier * sleepIntervalMillis),
+      startTimeInDoWork.get());
+    assertEquals("endTime after 2xinterval", clock.getTime() -
+        sleepIntervalMillis,
+      endTimeInDoWork.get());
+    assertEquals("checkpoint after 2xinterval", clock.getTime() -
+      sleepIntervalMillis, checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals(3, actualRuns);
+
+    // exactly one sleepInterval after one that lagged by one whole interval,
+    // so it will do the previous one... and sleep as usual
+    // no way to keep up
+    clock.setTime(clock.getTime() + sleepIntervalMillis);
+    sleep = agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime ", clock.getTime() -
+        (checkpointCutOffMultiplier * sleepIntervalMillis),
+      startTimeInDoWork.get());
+    assertEquals("endTime  ", clock.getTime() -
+        sleepIntervalMillis,
+      endTimeInDoWork.get());
+    assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis,
+      checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+    assertEquals(4, actualRuns);
+
+
+    // checkpointCutOffMultiplier x sleepInterval - in normal state should 
pass,
+    // but the clock lags too much, so this will not execute aggregation
+    // just update checkpoint to currentTime
+    clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
+      sleepIntervalMillis));
+    sleep = agg.runOnce(sleepIntervalMillis);
+    assertEquals(4, actualRuns);
+    assertEquals("checkpoint after too much lag is reset to " +
+        "current clock time",
+      clock.getTime(), checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+  }
+
+  @Test
+  public void testDoWorkOnInterruptedRuns() throws Exception {
+    // start at some non-zero arbitrarily selected time;
+    int startingTime = 10000;
+
+    // 1.
+    clock.setTime(startingTime);
+    long timeOfFirstStep = clock.getTime();
+    long sleep = agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
+    assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
+    assertEquals("do not aggregate on first run", 0, actualRuns);
+    assertEquals("first checkpoint set on current time", timeOfFirstStep,
+      checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+
+    // 2.
+    // the doWork was fast, and sleep was interrupted (e.g. restart)
+    // Q: do we want to aggregate just part of the system? maybe we should
+    // sleep up to next cycle start!!
+    clock.setTime(timeOfFirstStep + 1);
+    long timeOfSecondStep = clock.getTime();
+    sleep = agg.runOnce(sleepIntervalMillis);
+    assertEquals("startTime should be on previous checkpoint since it did not" 
+
+        " run yet",
+      timeOfFirstStep, startTimeInDoWork.get());
+
+    assertEquals("endTime can be start + interval",
+      startingTime + sleepIntervalMillis,
+      endTimeInDoWork.get());
+    assertEquals("should aggregate", 1, actualRuns);
+    assertEquals("checkpoint here should be set to min(endTime,currentTime), " 
+
+        "it is currentTime in our scenario",
+      timeOfSecondStep, checkPoint.get());
+
+    assertEquals(sleep, sleepIntervalMillis);
+
+    //3.
+    // and again not a full sleep passed, so only small part was aggregated
+    clock.setTime(startingTime + 2);
+    long timeOfThirdStep = clock.getTime();
+
+    sleep = agg.runOnce(sleepIntervalMillis);
+    // startTime and endTime are both be in the future, makes no sens,
+    // query will not work!!
+    assertEquals("startTime should be previous checkpoint",
+      timeOfSecondStep, startTimeInDoWork.get());
+
+    assertEquals("endTime  can be start + interval",
+      timeOfSecondStep + sleepIntervalMillis,
+      endTimeInDoWork.get());
+    assertEquals("should aggregate", 2, actualRuns);
+    assertEquals("checkpoint here should be set to min(endTime,currentTime), " 
+
+        "it is currentTime in our scenario",
+      timeOfThirdStep,
+      checkPoint.get());
+    assertEquals(sleep, sleepIntervalMillis);
+
+  }
+
+  private static class TestClock implements Clock {
+
+    private long time;
+
+    public void setTime(long time) {
+      this.time = time;
+    }
+
+    @Override
+    public long getTime() {
+      return time;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
new file mode 100644
index 0000000..6672dae
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -0,0 +1,677 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
+  private final TimelineMetricReadHelper metricReader = new 
TimelineMetricReadHelper(false);
+
+  private Configuration getConfigurationForTest(boolean useGroupByAggregators) 
{
+    Configuration configuration = new Configuration();
+    configuration.set("timeline.metrics.service.use.groupBy.aggregators", 
String.valueOf(useGroupByAggregators));
+    return configuration;
+  }
+
+  @Test
+  public void testShouldAggregateClusterProperly() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, 
getConfigurationForTest(false));
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 1));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 2));
+    ctime += 2*minute;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 2));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 1));
+
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        readHelper.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2, currentHostAggregate.getNumberOfHosts());
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(3.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+  }
+
+  @Test
+  public void testShouldAggregateClusterIgnoringInstance() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, 
getConfigurationForTest(false));
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000 * 2;
+
+    /**
+     * Here we have two nodes with two instances each:
+     *              | local1 | local2 |
+     *  instance i1 |   1    |   2    |
+     *  instance i2 |   3    |   4    |
+     *
+     */
+    // Four 1's at ctime - 100
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1",
+      "i1", "disk_free", 1));
+    // Four 2's at ctime - 100: different host
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2",
+      "i1", "disk_free", 2));
+    // Avoid overwrite
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1",
+      "i2", "disk_free", 3));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2",
+      "i2", "disk_free", 4));
+
+    ctime += minute;
+
+    // Four 1's at ctime + 2 min
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1",
+      "i1", "disk_free", 1));
+    // Four 1's at ctime + 2 min - different host
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2",
+      "i1", "disk_free", 3));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1",
+      "i2", "disk_free", 2));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2",
+      "i2", "disk_free", 4));
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime - 1000, endTime + 1000);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        readHelper.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        System.out.println("OUTPUT: " + currentMetric + " - " + 
currentHostAggregate);
+        assertEquals(2, currentHostAggregate.getNumberOfHosts());
+        assertEquals(5.0, Math.floor(currentHostAggregate.getSum()));
+        recordCount++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+
+    Assert.assertEquals(5, recordCount);
+  }
+
+  @Test
+  public void testShouldAggregateDifferentMetricsOnClusterProperly() throws 
Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, 
getConfigurationForTest(false));
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+    // here we put some metrics tha will be aggregated
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 1));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 2));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_used", 1));
+
+    ctime += 2*minute;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 2));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 1));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_used", 1));
+
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        readHelper.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2, currentHostAggregate.getNumberOfHosts());
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(3.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else if ("disk_used".equals(currentMetric.getMetricName())) {
+        assertEquals(1, currentHostAggregate.getNumberOfHosts());
+        assertEquals(1.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(1.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+  }
+
+  @Test
+  public void testAggregateDailyClusterMetrics() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, 
getConfigurationForTest(false));
+
+    // this time can be virtualized! or made independent from real clock
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long hour = 3600 * 1000;
+
+    Map<TimelineClusterMetric, MetricHostAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+
+    records.put(createEmptyTimelineClusterMetric(ctime),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += hour),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += hour),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += hour),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+
+
+    hdb.saveClusterTimeAggregateRecords(records, 
METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+
+    // WHEN
+    agg.doWork(startTime, ctime + hour + 1000);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY");
+    int count = 0;
+    while (rs.next()) {
+      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      count++;
+    }
+
+    assertEquals("Day aggregated row expected ", 1, count);
+  }
+
+  @Test
+  public void testShouldAggregateClusterOnMinuteProperly() throws Exception {
+
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, 
getConfigurationForTest(false));
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long second = 1000;
+    long minute = 60*second;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+    agg.doWork(startTime, ctime + second);
+    long oldCtime = ctime + second;
+
+    //Next minute
+    ctime = startTime + minute;
+
+    records.put(createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+    agg.doWork(oldCtime, ctime + second);
+
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE");
+    int count = 0;
+    long diff = 0 ;
+    while (rs.next()) {
+      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      if (count == 0) {
+        diff+=rs.getLong("SERVER_TIME");
+      } else {
+        diff-=rs.getLong("SERVER_TIME");
+        if (diff < 0) {
+          diff*=-1;
+        }
+        assertTrue(diff == minute);
+      }
+      count++;
+    }
+
+    assertEquals("One hourly aggregated row expected ", 2, count);
+  }
+
+  @Test
+  public void testShouldAggregateClusterOnHourProperly() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, 
getConfigurationForTest(false));
+
+    // this time can be virtualized! or made independent from real clock
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+
+    // WHEN
+    agg.doWork(startTime, ctime + minute);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+    int count = 0;
+    while (rs.next()) {
+      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      count++;
+    }
+
+    assertEquals("One hourly aggregated row expected ", 1, count);
+  }
+
+  @Test
+  public void testShouldAggregateDifferentMetricsOnHourProperly() throws 
Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, 
getConfigurationForTest(false));
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    hdb.saveClusterAggregateRecords(records);
+
+    // WHEN
+    agg.doWork(startTime, ctime + minute);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+    int count = 0;
+    while (rs.next()) {
+      if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
+        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+        assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
+        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
+      }
+
+      count++;
+    }
+
+    assertEquals("Two hourly aggregated row expected ", 2, count);
+  }
+
+  @Test
+  public void testAppLevelHostMetricAggregates() throws Exception {
+    Configuration conf = getConfigurationForTest(false);
+    conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, 
conf);
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric((ctime), "local1",
+      "app1", null, "app_metric_random", 1));
+    ctime += 10;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "cpu_user", 1));
+    ctime += 10;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "cpu_user", 2));
+
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new DefaultCondition(
+      Collections.singletonList("cpu_user"), null, "app1", null,
+      startTime, endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+      (conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    TimelineClusterMetric currentMetric = null;
+    MetricClusterAggregate currentHostAggregate = null;
+    while (rs.next()) {
+      currentMetric = metricReader.fromResultSet(rs);
+      currentHostAggregate = 
readHelper.getMetricClusterAggregateFromResultSet(rs);
+      recordCount++;
+    }
+    assertEquals(3, recordCount);
+    assertNotNull(currentMetric);
+    assertEquals("cpu_user", currentMetric.getMetricName());
+    assertEquals("app1", currentMetric.getAppId());
+    assertNotNull(currentHostAggregate);
+    assertEquals(1, currentHostAggregate.getNumberOfHosts());
+    assertEquals(1.0d, currentHostAggregate.getSum());
+  }
+
+  @Test
+  public void testClusterAggregateMetricNormalization() throws Exception {
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, 
getConfigurationForTest(false));
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+    // Sample data
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
+    metric1.setAppId("resourcemanager");
+    metric1.setHostName("h1");
+    metric1.setStartTime(1431372311811l);
+    metric1.setMetricValues(new TreeMap<Long, Double>() {{
+      put(1431372311811l, 1.0);
+      put(1431372321811l, 1.0);
+      put(1431372331811l, 1.0);
+      put(1431372341811l, 1.0);
+      put(1431372351811l, 1.0);
+      put(1431372361811l, 1.0);
+      put(1431372371810l, 1.0);
+    }});
+
+    TimelineMetric metric2 = new TimelineMetric();
+    metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
+    metric2.setAppId("resourcemanager");
+    metric2.setHostName("h1");
+    metric2.setStartTime(1431372381810l);
+    metric2.setMetricValues(new TreeMap<Long, Double>() {{
+      put(1431372381810l, 1.0);
+      put(1431372391811l, 1.0);
+      put(1431372401811l, 1.0);
+      put(1431372411811l, 1.0);
+      put(1431372421811l, 1.0);
+      put(1431372431811l, 1.0);
+      put(1431372441810l, 1.0);
+    }});
+
+    TimelineMetrics metrics = new TimelineMetrics();
+    metrics.setMetrics(Collections.singletonList(metric1));
+    insertMetricRecords(conn, metrics, 1431372371810l);
+
+    metrics.setMetrics(Collections.singletonList(metric2));
+    insertMetricRecords(conn, metrics, 1431372441810l);
+
+    long startTime = 1431372055000l;
+    long endTime = 1431372655000l;
+
+    agg.doWork(startTime, endTime);
+
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate = 
readHelper.getMetricClusterAggregateFromResultSet(rs);
+
+      if 
("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) {
+        assertEquals(1, currentHostAggregate.getNumberOfHosts());
+        assertEquals(1.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(1.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+    Assert.assertEquals(5, recordCount);
+  }
+
+  @Test
+  public void testAggregationUsingGroupByQuery() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, 
getConfigurationForTest(true));
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    hdb.saveClusterAggregateRecords(records);
+
+    // WHEN
+    agg.doWork(startTime, ctime + minute);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+    int count = 0;
+    while (rs.next()) {
+      if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
+        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+        assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
+        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
+      }
+      count++;
+    }
+    assertEquals("Two hourly aggregated row expected ", 2, count);
+  }
+
+  private ResultSet executeQuery(String query) throws SQLException {
+    Connection conn = getConnection(getUrl());
+    Statement stmt = conn.createStatement();
+    return stmt.executeQuery(query);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
new file mode 100644
index 0000000..9c7c8fa
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
+
+  @Test
+  public void testShouldInsertMetrics() throws Exception {
+    // GIVEN
+
+    // WHEN
+    long startTime = System.currentTimeMillis();
+    TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
+    hdb.insertMetricRecords(metricsSent);
+
+    Condition queryCondition = new DefaultCondition(null,
+        Collections.singletonList("local"), null, null, startTime,
+        startTime + (15 * 60 * 1000), null, null, false);
+    TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition, null);
+
+    // THEN
+    assertThat(recordRead.getMetrics()).hasSize(2)
+      .extracting("metricName")
+      .containsOnly("mem_free", "disk_free");
+
+    assertThat(metricsSent.getMetrics())
+      .usingElementComparator(TIME_IGNORING_COMPARATOR)
+      .containsExactlyElementsOf(recordRead.getMetrics());
+  }
+
+  private Configuration getConfigurationForTest(boolean useGroupByAggregators) 
{
+    Configuration configuration = new Configuration();
+    configuration.set("timeline.metrics.service.use.groupBy.aggregators", 
String.valueOf(useGroupByAggregators));
+    return configuration;
+  }
+
+  @Test
+  public void testShouldAggregateMinuteProperly() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregatorMinute =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
+        getConfigurationForTest(false));
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+
+    // WHEN
+    long endTime = startTime + 1000 * 60 * 4;
+    boolean success = aggregatorMinute.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        count++;
+      } else if ("mem_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        count++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+    assertEquals("Two aggregated entries expected", 2, count);
+  }
+
+  @Test
+   public void testShouldAggregateHourProperly() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb,
+        getConfigurationForTest(false));
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+    long startTime = System.currentTimeMillis();
+
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+    Map<TimelineMetric, MetricHostAggregate>
+      aggMap = new HashMap<TimelineMetric,
+      MetricHostAggregate>();
+
+    int min_5 = 5 * 60 * 1000;
+    long ctime = startTime - min_5;
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+
+    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+    //WHEN
+    long endTime = ctime + min_5;
+    boolean success = aggregator.doWork(startTime, endTime);
+    assertTrue(success);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_AGGREGATE_HOURLY_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_used".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(12 * 15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+      }
+    }
+  }
+
+  @Test
+  public void testMetricAggregateDaily() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb,
+        getConfigurationForTest(false));
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+    long startTime = System.currentTimeMillis();
+
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+    Map<TimelineMetric, MetricHostAggregate>
+      aggMap = new HashMap<TimelineMetric, MetricHostAggregate>();
+
+    int min_5 = 5 * 60 * 1000;
+    long ctime = startTime - min_5;
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+
+    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_HOURLY_TABLE_NAME);
+
+    //WHEN
+    long endTime = ctime + min_5;
+    boolean success = aggregator.doWork(startTime, endTime);
+    assertTrue(success);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_AGGREGATE_DAILY_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_used".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(12 * 15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+      }
+    }
+  }
+
+  @Test
+  public void testAggregationUsingGroupByQuery() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregatorMinute =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
+        getConfigurationForTest(true));
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+
+    long endTime = startTime + 1000 * 60 * 4;
+    boolean success = aggregatorMinute.doWork(startTime, endTime);
+    assertTrue(success);
+
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        count++;
+      } else if ("mem_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        count++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+    assertEquals("Two aggregated entries expected", 2, count);
+  }
+
+  private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
+    new Comparator<TimelineMetric>() {
+      @Override
+      public int compare(TimelineMetric o1, TimelineMetric o2) {
+        return o1.equalsExceptTime(o2) ? 0 : 1;
+      }
+    };
+
+  private TimelineMetrics prepareTimelineMetrics(long startTime, String host) {
+    TimelineMetrics metrics = new TimelineMetrics();
+    metrics.setMetrics(Arrays.asList(
+      createMetric(startTime, "disk_free", host),
+      createMetric(startTime, "mem_free", host)));
+
+    return metrics;
+  }
+
+  private TimelineMetric createMetric(long startTime, String metricName, 
String host) {
+    TimelineMetric m = new TimelineMetric();
+    m.setAppId("host");
+    m.setHostName(host);
+    m.setMetricName(metricName);
+    m.setStartTime(startTime);
+    TreeMap<Long, Double> vals = new TreeMap<Long, Double>();
+    vals.put(startTime + 15000l, 0.0);
+    vals.put(startTime + 30000l, 0.0);
+    vals.put(startTime + 45000l, 1.0);
+    vals.put(startTime + 60000l, 2.0);
+
+    m.setMetricValues(vals);
+
+    return m;
+  }
+
+}

Reply via email to