Repository: ambari
Updated Branches:
  refs/heads/trunk 2557d9a8f -> 67c425acf


http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
new file mode 100644
index 0000000..293608e
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+/**
+ * Aggregates a metric across all hosts in the cluster. Reads metrics from
+ * the precision table and saves into the aggregate.
+ */
+public class TimelineMetricClusterAggregatorMinute extends 
AbstractTimelineAggregator {
+  private static final Log LOG = 
LogFactory.getLog(TimelineMetricClusterAggregatorMinute.class);
+  public Long timeSliceIntervalMillis;
+  private TimelineMetricReadHelper timelineMetricReadHelper = new 
TimelineMetricReadHelper(true);
+  // Aggregator to perform app-level aggregates for host metrics
+  private final TimelineMetricAppAggregator appAggregator;
+
+  public TimelineMetricClusterAggregatorMinute(PhoenixHBaseAccessor 
hBaseAccessor,
+                                               Configuration metricsConf,
+                                               String checkpointLocation,
+                                               Long sleepIntervalMillis,
+                                               Integer 
checkpointCutOffMultiplier,
+                                               String aggregatorDisabledParam,
+                                               String tableName,
+                                               String outputTableName,
+                                               Long nativeTimeRangeDelay,
+                                               Long timeSliceInterval) {
+    super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
+      checkpointCutOffMultiplier, aggregatorDisabledParam, tableName,
+      outputTableName, nativeTimeRangeDelay);
+
+    appAggregator = new TimelineMetricAppAggregator(metricsConf);
+    this.timeSliceIntervalMillis = timeSliceInterval;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws 
SQLException, IOException {
+    List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+    // Initialize app aggregates for host metrics
+    appAggregator.init();
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics 
=
+      aggregateMetricsFromResultSet(rs, timeSlices);
+
+    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric 
aggregates.");
+    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+    appAggregator.cleanup();
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_RECORD_TABLE_NAME));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private List<Long[]> getTimeSlices(long startTime, long endTime) {
+    List<Long[]> timeSlices = new ArrayList<Long[]>();
+    long sliceStartTime = startTime;
+    while (sliceStartTime < endTime) {
+      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + 
timeSliceIntervalMillis});
+      sliceStartTime += timeSliceIntervalMillis;
+    }
+    return timeSlices;
+  }
+
+  private Map<TimelineClusterMetric, MetricClusterAggregate> 
aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+      throws SQLException, IOException {
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics 
=
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+    // Create time slices
+
+    while (rs.next()) {
+      TimelineMetric metric = 
timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+
+      Map<TimelineClusterMetric, Double> clusterMetrics =
+        sliceFromTimelineMetric(metric, timeSlices);
+
+      if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+        for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+            clusterMetrics.entrySet()) {
+
+          TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+          Double avgValue = clusterMetricEntry.getValue();
+
+          MetricClusterAggregate aggregate = 
aggregateClusterMetrics.get(clusterMetric);
+
+          if (aggregate == null) {
+            aggregate = new MetricClusterAggregate(avgValue, 1, null, 
avgValue, avgValue);
+            aggregateClusterMetrics.put(clusterMetric, aggregate);
+          } else {
+            aggregate.updateSum(avgValue);
+            aggregate.updateNumberOfHosts(1);
+            aggregate.updateMax(avgValue);
+            aggregate.updateMin(avgValue);
+          }
+          // Update app level aggregates
+          appAggregator.processTimelineClusterMetric(clusterMetric,
+            metric.getHostName(), avgValue);
+        }
+      }
+    }
+    // Add app level aggregates to save
+    aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
+    return aggregateClusterMetrics;
+  }
+
+  private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+        TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+
+    if (timelineMetric.getMetricValues().isEmpty()) {
+      return null;
+    }
+
+    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+      new HashMap<TimelineClusterMetric, Double>();
+
+    for (Map.Entry<Long, Double> metric : 
timelineMetric.getMetricValues().entrySet()) {
+      // TODO: investigate null values - pre filter
+      if (metric.getValue() == null) {
+        continue;
+      }
+      Long timestamp = getSliceTimeForMetric(timeSlices,
+                       Long.parseLong(metric.getKey().toString()));
+      if (timestamp != -1) {
+        // Metric is within desired time range
+        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+          timelineMetric.getMetricName(),
+          timelineMetric.getAppId(),
+          timelineMetric.getInstanceId(),
+          timestamp,
+          timelineMetric.getType());
+        if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
+          timelineClusterMetricMap.put(clusterMetric, metric.getValue());
+        } else {
+          Double oldValue = timelineClusterMetricMap.get(clusterMetric);
+          Double newValue = (oldValue + metric.getValue()) / 2;
+          timelineClusterMetricMap.put(clusterMetric, newValue);
+        }
+      }
+    }
+
+    return timelineClusterMetricMap;
+  }
+
+  /**
+   * Return beginning of the time slice into which the metric fits.
+   */
+  private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+    for (Long[] timeSlice : timeSlices) {
+      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+        return timeSlice[0];
+      }
+    }
+    return -1l;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
new file mode 100644
index 0000000..18e5f18
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
+public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = 
LogFactory.getLog(TimelineMetricHostAggregator.class);
+
+  public TimelineMetricHostAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                      Configuration metricsConf,
+                                      String checkpointLocation,
+                                      Long sleepIntervalMillis,
+                                      Integer checkpointCutOffMultiplier,
+                                      String hostAggregatorDisabledParam,
+                                      String tableName,
+                                      String outputTableName,
+                                      Long nativeTimeRangeDelay) {
+    super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
+      checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName,
+      outputTableName, nativeTimeRangeDelay);
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws 
IOException, SQLException {
+
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = 
aggregateMetricsFromResultSet(rs);
+
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
nativeTimeRangeDelay),
+      tableName));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("HOSTNAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private Map<TimelineMetric, MetricHostAggregate> 
aggregateMetricsFromResultSet(ResultSet rs)
+      throws IOException, SQLException {
+    TimelineMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      new HashMap<TimelineMetric, MetricHostAggregate>();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+      }
+
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        hostAggregate.updateAggregates(currentHostAggregate);
+      } else {
+        // Switched over to a new metric - save existing - create new aggregate
+        hostAggregate = new MetricHostAggregate();
+        hostAggregate.updateAggregates(currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
+      }
+    }
+    return hostAggregateMap;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index 40a9648..25f8c62 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -62,5 +62,41 @@ public class TimelineMetricReadHelper {
     return metric;
   }
 
+  public MetricClusterAggregate 
getMetricClusterAggregateFromResultSet(ResultSet rs)
+      throws SQLException {
+    MetricClusterAggregate agg = new MetricClusterAggregate();
+    agg.setSum(rs.getDouble("METRIC_SUM"));
+    agg.setMax(rs.getDouble("METRIC_MAX"));
+    agg.setMin(rs.getDouble("METRIC_MIN"));
+    agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
+
+    agg.setDeviation(0.0);
+
+    return agg;
+  }
+
+  public MetricClusterAggregate 
getMetricClusterTimeAggregateFromResultSet(ResultSet rs)
+    throws SQLException {
+    MetricClusterAggregate agg = new MetricClusterAggregate();
+    agg.setSum(rs.getDouble("METRIC_SUM"));
+    agg.setMax(rs.getDouble("METRIC_MAX"));
+    agg.setMin(rs.getDouble("METRIC_MIN"));
+    agg.setNumberOfHosts(rs.getInt("METRIC_COUNT"));
+
+    agg.setDeviation(0.0);
+
+    return agg;
+  }
+
+
+  public TimelineClusterMetric fromResultSet(ResultSet rs) throws SQLException 
{
+    return new TimelineClusterMetric(
+      rs.getString("METRIC_NAME"),
+      rs.getString("APP_ID"),
+      ignoreInstance ? null : rs.getString("INSTANCE_ID"),
+      rs.getLong("SERVER_TIME"),
+      rs.getString("UNITS"));
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 636999f..a630e77 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -58,24 +58,8 @@ public class PhoenixTransactSQL {
     "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
     "TTL=%s, COMPRESSION='%s'";
 
-  public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
-      "(METRIC_NAME VARCHAR, " +
-      "HOSTNAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE," +
-      "METRIC_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE," +
-      "METRIC_MIN DOUBLE CONSTRAINT pk " +
-      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-      "TTL=%s, COMPRESSION='%s'";
-
-  public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
+  public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS %s " +
       "(METRIC_NAME VARCHAR, " +
       "HOSTNAME VARCHAR, " +
       "APP_ID VARCHAR, " +
@@ -91,7 +75,7 @@ public class PhoenixTransactSQL {
       " COMPRESSION='%s'";
 
   public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
+    "CREATE TABLE IF NOT EXISTS %s " +
       "(METRIC_NAME VARCHAR, " +
       "APP_ID VARCHAR, " +
       "INSTANCE_ID VARCHAR, " +
@@ -105,8 +89,9 @@ public class PhoenixTransactSQL {
       "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
       "TTL=%s, COMPRESSION='%s'";
 
+  // HOSTS_COUNT vs METRIC_COUNT
   public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL 
=
-    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
+    "CREATE TABLE IF NOT EXISTS %s " +
       "(METRIC_NAME VARCHAR, " +
       "APP_ID VARCHAR, " +
       "INSTANCE_ID VARCHAR, " +
@@ -139,7 +124,7 @@ public class PhoenixTransactSQL {
     "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
   public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
-    "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
+    "%s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
     "UNITS, " +
     "METRIC_SUM, " +
     "HOSTS_COUNT, " +
@@ -156,7 +141,6 @@ public class PhoenixTransactSQL {
     "METRIC_MIN) " +
     "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
-
   public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
     "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
     "SERVER_TIME, " +
@@ -198,25 +182,29 @@ public class PhoenixTransactSQL {
     "METRIC_MIN " +
     "FROM %s";
 
-  public static final String GET_CLUSTER_AGGREGATE_HOURLY_SQL = "SELECT %s " +
-      "METRIC_NAME, APP_ID, " +
-      "INSTANCE_ID, SERVER_TIME, " +
-      "UNITS, " +
-      "METRIC_SUM, " +
-      "METRIC_COUNT, " +
-      "METRIC_MAX, " +
-      "METRIC_MIN " +
-      "FROM %s";
+  public static final String GET_CLUSTER_AGGREGATE_TIME_SQL = "SELECT %s " +
+    "METRIC_NAME, APP_ID, " +
+    "INSTANCE_ID, SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN " +
+    "FROM %s";
 
   public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
   public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
     "METRIC_RECORD_MINUTE";
   public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
     "METRIC_RECORD_HOURLY";
+  public static final String METRICS_AGGREGATE_DAILY_TABLE_NAME =
+    "METRIC_RECORD_DAILY";
   public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
     "METRIC_AGGREGATE";
   public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
     "METRIC_AGGREGATE_HOURLY";
+  public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME =
+    "METRIC_AGGREGATE_DAILY";
   public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
   public static final String DEFAULT_ENCODING = "FAST_DIFF";
   public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
@@ -250,7 +238,11 @@ public class PhoenixTransactSQL {
         long endTime = condition.getEndTime() == null ? 
System.currentTimeMillis() : condition.getEndTime();
         long startTime = condition.getStartTime() == null ? 0 : 
condition.getStartTime();
         Long timeRange = endTime - startTime;
-        if (timeRange > 5 * DAY) {
+        if (timeRange > 7 * DAY) {
+          metricsTable = METRICS_AGGREGATE_DAILY_TABLE_NAME;
+          query = GET_METRIC_AGGREGATE_ONLY_SQL;
+          condition.setPrecision(Precision.DAYS);
+        } else if (timeRange < 7 * DAY && timeRange > DAY) {
           metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
           query = GET_METRIC_AGGREGATE_ONLY_SQL;
           condition.setPrecision(Precision.HOURS);
@@ -265,6 +257,10 @@ public class PhoenixTransactSQL {
         }
       } else {
         switch (condition.getPrecision()) {
+          case DAYS:
+            metricsTable = METRICS_AGGREGATE_DAILY_TABLE_NAME;
+            query = GET_METRIC_AGGREGATE_ONLY_SQL;
+            break;
           case HOURS:
             metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
             query = GET_METRIC_AGGREGATE_ONLY_SQL;
@@ -462,9 +458,13 @@ public class PhoenixTransactSQL {
       long endTime = condition.getEndTime() == null ? 
System.currentTimeMillis() : condition.getEndTime();
       long startTime = condition.getStartTime() == null ? 0 : 
condition.getStartTime();
       Long timeRange = endTime - startTime;
-      if (timeRange > 5 * DAY) {
+      if (timeRange > 7 * DAY) {
+        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+        queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
+        condition.setPrecision(Precision.DAYS);
+      } else if (timeRange < 7 * DAY && timeRange > DAY) {
         metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-        queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL;
+        queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
         condition.setPrecision(Precision.HOURS);
       } else {
         metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
@@ -473,9 +473,13 @@ public class PhoenixTransactSQL {
       }
     } else {
       switch (condition.getPrecision()) {
+        case DAYS:
+          metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+          queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
+          break;
         case HOURS:
           metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-          queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL;
+          queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
           break;
         default:
           metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
@@ -483,6 +487,10 @@ public class PhoenixTransactSQL {
       }
     }
 
+    queryStmt = String.format(queryStmt,
+      getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA),
+      metricsAggregateTable);
+
     StringBuilder sb = new StringBuilder(queryStmt);
     sb.append(" WHERE ");
     sb.append(condition.getConditionClause());
@@ -491,9 +499,7 @@ public class PhoenixTransactSQL {
       sb.append(" LIMIT ").append(condition.getLimit());
     }
 
-    String query = String.format(sb.toString(),
-      PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(),
-        NATIVE_TIME_RANGE_DELTA), metricsAggregateTable);
+    String query = sb.toString();
     if (LOG.isDebugEnabled()) {
       LOG.debug("SQL => " + query + ", condition => " + condition);
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index 90c03e4..6cfaa2e 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -19,6 +19,7 @@ package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.query.BaseTest;
@@ -49,11 +50,11 @@ public abstract class AbstractMiniHBaseClusterTest extends 
BaseTest {
   @BeforeClass
   public static void doSetup() throws Exception {
     Map<String, String> props = getDefaultProps();
+    props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, "false");
     props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
     props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
     // Make a small batch size to test multiple calls to reserve sequences
-    props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
-      Long.toString(BATCH_SIZE));
+    props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, 
Long.toString(BATCH_SIZE));
     // Must update config before starting server
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
index 27e9d67..aa276f3 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -20,17 +20,18 @@ package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetricReader;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -39,6 +40,7 @@ import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertNotNull;
 import static junit.framework.Assert.fail;
@@ -46,14 +48,14 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 
 public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   private Connection conn;
   private PhoenixHBaseAccessor hdb;
-  private final TimelineClusterMetricReader metricReader = new
-    TimelineClusterMetricReader(false);
+  private final TimelineMetricReadHelper metricReader = new 
TimelineMetricReadHelper(false);
 
   @Before
   public void setUp() throws Exception {
@@ -83,8 +85,9 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
   @Test
   public void testShouldAggregateClusterProperly() throws Exception {
     // GIVEN
-    TimelineMetricClusterAggregator agg =
-      new TimelineMetricClusterAggregator(hdb, new Configuration());
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new 
Configuration());
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -118,7 +121,7 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
     while (rs.next()) {
       TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
       MetricClusterAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+        readHelper.getMetricClusterAggregateFromResultSet(rs);
 
       if ("disk_free".equals(currentMetric.getMetricName())) {
         assertEquals(2, currentHostAggregate.getNumberOfHosts());
@@ -136,8 +139,9 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
   @Test
   public void testShouldAggregateClusterIgnoringInstance() throws Exception {
     // GIVEN
-    TimelineMetricClusterAggregator agg =
-      new TimelineMetricClusterAggregator(hdb, new Configuration());
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new 
Configuration());
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -188,7 +192,7 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
       TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
 //        PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
       MetricClusterAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+        readHelper.getMetricClusterAggregateFromResultSet(rs);
 
       if ("disk_free".equals(currentMetric.getMetricName())) {
         System.out.println("OUTPUT: " + currentMetric+" - " +
@@ -205,11 +209,11 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
   }
 
   @Test
-  public void testShouldAggregateDifferentMetricsOnClusterProperly()
-    throws Exception {
+  public void testShouldAggregateDifferentMetricsOnClusterProperly() throws 
Exception {
     // GIVEN
-    TimelineMetricClusterAggregator agg =
-      new TimelineMetricClusterAggregator(hdb, new Configuration());
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new 
Configuration());
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // here we put some metrics tha will be aggregated
     long startTime = System.currentTimeMillis();
@@ -249,7 +253,7 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
     while (rs.next()) {
       TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
       MetricClusterAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+        readHelper.getMetricClusterAggregateFromResultSet(rs);
 
       if ("disk_free".equals(currentMetric.getMetricName())) {
         assertEquals(2, currentHostAggregate.getNumberOfHosts());
@@ -269,12 +273,56 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
     }
   }
 
+  @Test
+  public void testAggregateDailyClusterMetrics() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, new 
Configuration());
+
+    // this time can be virtualized! or made independent from real clock
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long hour = 3600 * 1000;
+
+    Map<TimelineClusterMetric, MetricHostAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+
+    records.put(createEmptyTimelineClusterMetric(ctime),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += hour),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += hour),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += hour),
+      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
+
+
+    hdb.saveClusterTimeAggregateRecords(records, 
METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+
+    // WHEN
+    agg.doWork(startTime, ctime + hour + 1000);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY");
+    int count = 0;
+    while (rs.next()) {
+      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      count++;
+    }
+
+    assertEquals("Day aggregated row expected ", 1, count);
+  }
 
   @Test
   public void testShouldAggregateClusterOnHourProperly() throws Exception {
     // GIVEN
-    TimelineMetricClusterAggregatorHourly agg =
-      new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new 
Configuration());
 
     // this time can be virtualized! or made independent from real clock
     long startTime = System.currentTimeMillis();
@@ -315,11 +363,10 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
   }
 
   @Test
-  public void testShouldAggregateDifferentMetricsOnHourProperly() throws
-    Exception {
+  public void testShouldAggregateDifferentMetricsOnHourProperly() throws 
Exception {
     // GIVEN
-    TimelineMetricClusterAggregatorHourly agg =
-      new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new 
Configuration());
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -381,7 +428,9 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
   public void testAppLevelHostMetricAggregates() throws Exception {
     Configuration conf = new Configuration();
     conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
-    TimelineMetricClusterAggregator agg = new 
TimelineMetricClusterAggregator(hdb, conf);
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, 
conf);
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -416,7 +465,7 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
     MetricClusterAggregate currentHostAggregate = null;
     while (rs.next()) {
       currentMetric = metricReader.fromResultSet(rs);
-      currentHostAggregate = 
PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+      currentHostAggregate = 
readHelper.getMetricClusterAggregateFromResultSet(rs);
       recordCount++;
     }
     assertEquals(4, recordCount);

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
index 66420d9..f4f2223 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -37,15 +38,17 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
+
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 import static org.assertj.core.api.Assertions.assertThat;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
 
 public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
   private Connection conn;
@@ -102,10 +105,8 @@ public class ITMetricAggregator extends 
AbstractMiniHBaseClusterTest {
   @Test
   public void testShouldAggregateMinuteProperly() throws Exception {
     // GIVEN
-//    TimelineMetricAggregatorMinute aggregatorMinute =
-//      new TimelineMetricAggregatorMinute(hdb, new Configuration());
-    TimelineMetricAggregator aggregatorMinute = TimelineMetricAggregatorFactory
-      .createTimelineMetricAggregatorMinute(hdb, new Configuration());
+    TimelineMetricAggregator aggregatorMinute =
+      
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new 
Configuration());
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -162,17 +163,14 @@ public class ITMetricAggregator extends 
AbstractMiniHBaseClusterTest {
   }
 
   @Test
-  public void testShouldAggregateHourProperly() throws Exception {
+   public void testShouldAggregateHourProperly() throws Exception {
     // GIVEN
-//    TimelineMetricAggregatorHourly aggregator =
-//      new TimelineMetricAggregatorHourly(hdb, new Configuration());
-
-    TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory
-      .createTimelineMetricAggregatorHourly(hdb, new Configuration());
+    TimelineMetricAggregator aggregator =
+      
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new 
Configuration());
     long startTime = System.currentTimeMillis();
 
     MetricHostAggregate expectedAggregate =
-        MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
     Map<TimelineMetric, MetricHostAggregate>
       aggMap = new HashMap<TimelineMetric,
       MetricHostAggregate>();
@@ -226,6 +224,66 @@ public class ITMetricAggregator extends 
AbstractMiniHBaseClusterTest {
     }
   }
 
+  @Test
+  public void testMetricAggregateDaily() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb, 
new Configuration());
+    long startTime = System.currentTimeMillis();
+
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+    Map<TimelineMetric, MetricHostAggregate>
+      aggMap = new HashMap<TimelineMetric, MetricHostAggregate>();
+
+    int min_5 = 5 * 60 * 1000;
+    long ctime = startTime - min_5;
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+
+    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_HOURLY_TABLE_NAME);
+
+    //WHEN
+    long endTime = ctime + min_5;
+    boolean success = aggregator.doWork(startTime, endTime);
+    assertTrue(success);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
+      METRICS_AGGREGATE_DAILY_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_used".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(12 * 15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+      }
+    }
+  }
+
   private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
     new Comparator<TimelineMetric>() {
       @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 11c4d0f..4c56f77 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -26,15 +26,11 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -44,14 +40,14 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 
 
 public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
@@ -119,8 +115,8 @@ public class ITPhoenixHBaseAccessor extends 
AbstractMiniHBaseClusterTest {
   @Test
   public void testGetMetricRecordsMinutes() throws IOException, SQLException {
     // GIVEN
-    TimelineMetricAggregator aggregatorMinute = TimelineMetricAggregatorFactory
-        .createTimelineMetricAggregatorMinute(hdb, new Configuration());
+    TimelineMetricAggregator aggregatorMinute =
+      
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new 
Configuration());
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -156,8 +152,8 @@ public class ITPhoenixHBaseAccessor extends 
AbstractMiniHBaseClusterTest {
   @Test
   public void testGetMetricRecordsHours() throws IOException, SQLException {
     // GIVEN
-    TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory
-        .createTimelineMetricAggregatorHourly(hdb, new Configuration());
+    TimelineMetricAggregator aggregator =
+      
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new 
Configuration());
 
     MetricHostAggregate expectedAggregate =
         createMetricHostAggregate(2.0, 0.0, 20, 15.0);
@@ -207,8 +203,8 @@ public class ITPhoenixHBaseAccessor extends 
AbstractMiniHBaseClusterTest {
   @Test
   public void testGetClusterMetricRecordsSeconds() throws Exception {
     // GIVEN
-    TimelineMetricClusterAggregator agg =
-        new TimelineMetricClusterAggregator(hdb, new Configuration());
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new 
Configuration());
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
@@ -246,8 +242,8 @@ public class ITPhoenixHBaseAccessor extends 
AbstractMiniHBaseClusterTest {
   @Test
   public void testGetClusterMetricRecordsHours() throws Exception {
     // GIVEN
-    TimelineMetricClusterAggregatorHourly agg =
-        new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new 
Configuration());
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -286,9 +282,7 @@ public class ITPhoenixHBaseAccessor extends 
AbstractMiniHBaseClusterTest {
     assertEquals(2.0, metric.getMetricValues().values().iterator().next(), 
0.00001);
   }
 
-  private Map<String, List<Function>> singletonValueFunctionMap(String
-                                                                  metricName) {
-    return Collections.singletonMap(metricName, Collections.singletonList
-      (new Function()));
+  private Map<String, List<Function>> singletonValueFunctionMap(String 
metricName) {
+    return Collections.singletonMap(metricName, Collections.singletonList(new 
Function()));
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
index bdcd6df..26771d7 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
@@ -28,11 +28,8 @@ import java.util.Map;
 
 public class MetricTestHelper {
 
-  public static MetricHostAggregate
-  createMetricHostAggregate(double max, double min, int numberOfSamples,
-                            double sum) {
-    MetricHostAggregate expectedAggregate =
-        new MetricHostAggregate();
+  public static MetricHostAggregate createMetricHostAggregate(double max, 
double min, int numberOfSamples, double sum) {
+    MetricHostAggregate expectedAggregate = new MetricHostAggregate();
     expectedAggregate.setMax(max);
     expectedAggregate.setMin(min);
     expectedAggregate.setNumberOfSamples(numberOfSamples);

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
index d6c1814..44f48e8 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -53,8 +53,8 @@ public class TestMetricHostAggregate {
     assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
   }
 
-  private MetricHostAggregate createAggregate
-    (double sum, double min, double max, int samplesCount) {
+  static MetricHostAggregate createAggregate (Double sum, Double min,
+                                              Double max, Integer 
samplesCount) {
     MetricHostAggregate aggregate = new MetricHostAggregate();
     aggregate.setSum(sum);
     aggregate.setMax(max);

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index 74c4304..fc52e5a 100644
--- 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -53,6 +53,14 @@
     </description>
   </property>
   <property>
+    <name>timeline.metrics.host.aggregator.minute.interval</name>
+    <value>120</value>
+    <description>
+      Time in seconds to sleep for the minute resolution host based
+      aggregator. Default resolution is 2 minutes.
+    </description>
+  </property>
+  <property>
     <name>timeline.metrics.host.aggregator.hourly.interval</name>
     <value>3600</value>
     <description>
@@ -61,11 +69,11 @@
     </description>
   </property>
   <property>
-    <name>timeline.metrics.host.aggregator.minute.interval</name>
-    <value>120</value>
+    <name>timeline.metrics.daily.aggregator.minute.interval</name>
+    <value>86400</value>
     <description>
-      Time in seconds to sleep for the minute resolution host based
-      aggregator. Default resolution is 2 minutes.
+      Time in seconds to sleep for the day resolution host based
+      aggregator. Default resolution is 24 hours.
     </description>
   </property>
   <property>
@@ -77,6 +85,14 @@
     </description>
   </property>
   <property>
+    <name>timeline.metrics.cluster.aggregator.daily.interval</name>
+    <value>86400</value>
+    <description>
+      Time in seconds to sleep for the day resolution cluster wide
+      aggregator. Default is 24 hours.
+    </description>
+  </property>
+  <property>
     <name>timeline.metrics.cluster.aggregator.minute.interval</name>
     <value>120</value>
     <description>
@@ -85,6 +101,15 @@
     </description>
   </property>
   <property>
+    
<name>timeline.metrics.host.aggregator.daily.checkpointCutOffMultiplier</name>
+    <value>1</value>
+    <description>
+      Multiplier value * interval = Max allowed checkpoint lag. Effectively
+      if aggregator checkpoint is greater than max allowed checkpoint delay,
+      the checkpoint will be discarded by the aggregator.
+    </description>
+  </property>
+  <property>
     
<name>timeline.metrics.host.aggregator.hourly.checkpointCutOffMultiplier</name>
     <value>2</value>
     <description>
@@ -121,6 +146,22 @@
     </description>
   </property>
   <property>
+    
<name>timeline.metrics.cluster.aggregator.daily.checkpointCutOffMultiplier</name>
+    <value>1</value>
+    <description>
+      Multiplier value * interval = Max allowed checkpoint lag. Effectively
+      if aggregator checkpoint is greater than max allowed checkpoint delay,
+      the checkpoint will be discarded by the aggregator.
+    </description>
+  </property>
+  <property>
+    <name>timeline.metrics.host.aggregator.daily.disabled</name>
+    <value>false</value>
+    <description>
+      Disable host based daily aggregations.
+    </description>
+  </property>
+  <property>
     <name>timeline.metrics.host.aggregator.hourly.disabled</name>
     <value>false</value>
     <description>
@@ -135,6 +176,13 @@
     </description>
   </property>
   <property>
+    <name>timeline.metrics.cluster.aggregator.daily.disabled</name>
+    <value>false</value>
+    <description>
+      Disable cluster based daily aggregations.
+    </description>
+  </property>
+  <property>
     <name>timeline.metrics.cluster.aggregator.hourly.disabled</name>
     <value>false</value>
     <description>
@@ -156,6 +204,13 @@
     </description>
   </property>
   <property>
+    <name>timeline.metrics.host.aggregator.daily.ttl</name>
+    <value>31536000</value>
+    <description>
+      Host based daily resolution data purge interval. Default is 1 year.
+    </description>
+  </property>
+  <property>
     <name>timeline.metrics.host.aggregator.hourly.ttl</name>
     <value>2592000</value>
     <description>
@@ -184,6 +239,13 @@
     </description>
   </property>
   <property>
+    <name>timeline.metrics.cluster.aggregator.daily.ttl</name>
+    <value>63072000</value>
+    <description>
+      Cluster wide daily resolution data purge interval. Default is 2 years.
+    </description>
+  </property>
+  <property>
     <name>timeline.metrics.host.aggregator.ttl</name>
     <value>86400</value>
     <description>

Reply via email to