http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java
deleted file mode 100644
index ea0913e..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-public class TimelineClusterMetricReader {
-
-  private boolean ignoreInstance;
-
-  public TimelineClusterMetricReader(boolean ignoreInstance) {
-    this.ignoreInstance = ignoreInstance;
-  }
-
-  public TimelineClusterMetric fromResultSet(ResultSet rs)
-    throws SQLException {
-
-    return new TimelineClusterMetric(
-      rs.getString("METRIC_NAME"),
-      rs.getString("APP_ID"),
-      ignoreInstance ? null : rs.getString("INSTANCE_ID"),
-      rs.getLong("SERVER_TIME"),
-      rs.getString("UNITS"));
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
deleted file mode 100644
index 459e612..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-
-public class TimelineMetricAggregator extends AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog
-    (TimelineMetricAggregator.class);
-
-  private final String checkpointLocation;
-  private final Long sleepIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-  private final String hostAggregatorDisabledParam;
-  private final String tableName;
-  private final String outputTableName;
-  private final Long nativeTimeRangeDelay;
-
-  public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
-                                  Configuration metricsConf,
-                                  String checkpointLocation,
-                                  Long sleepIntervalMillis,
-                                  Integer checkpointCutOffMultiplier,
-                                  String hostAggregatorDisabledParam,
-                                  String tableName,
-                                  String outputTableName,
-                                  Long nativeTimeRangeDelay) {
-    super(hBaseAccessor, metricsConf);
-    this.checkpointLocation = checkpointLocation;
-    this.sleepIntervalMillis = sleepIntervalMillis;
-    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
-    this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
-    this.tableName = tableName;
-    this.outputTableName = outputTableName;
-    this.nativeTimeRangeDelay =  nativeTimeRangeDelay;
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime)
-    throws IOException, SQLException {
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-      aggregateMetricsFromResultSet(rs);
-
-    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
-      outputTableName);
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
nativeTimeRangeDelay),
-      tableName));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("HOSTNAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private Map<TimelineMetric, MetricHostAggregate> 
aggregateMetricsFromResultSet
-      (ResultSet rs) throws IOException, SQLException {
-    TimelineMetric existingMetric = null;
-    MetricHostAggregate hostAggregate = null;
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-      new HashMap<TimelineMetric, MetricHostAggregate>();
-
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
-      MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
-      if (existingMetric == null) {
-        // First row
-        existingMetric = currentMetric;
-        hostAggregate = new MetricHostAggregate();
-        hostAggregateMap.put(currentMetric, hostAggregate);
-      }
-
-      if (existingMetric.equalsExceptTime(currentMetric)) {
-        // Recalculate totals with current metric
-        hostAggregate.updateAggregates(currentHostAggregate);
-      } else {
-        // Switched over to a new metric - save existing - create new aggregate
-        hostAggregate = new MetricHostAggregate();
-        hostAggregate.updateAggregates(currentHostAggregate);
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        existingMetric = currentMetric;
-      }
-    }
-    return hostAggregateMap;
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected boolean isDisabled() {
-    return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
deleted file mode 100644
index 1dfd3e6..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.conf.Configuration;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-/**
- *
- */
-public class TimelineMetricAggregatorFactory {
-  private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-checkpoint";
-  private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-hourly-checkpoint";
-
-  public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      MINUTE_AGGREGATE_CHECKPOINT_FILE);
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
-    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
-
-    String inputTableName = METRICS_RECORD_TABLE_NAME;
-    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-
-    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      120000l);
-  }
-
-  public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-    String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
-
-    String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-    String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-
-    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      3600000l);
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
deleted file mode 100644
index f595d5e..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-/**
- * Aggregates a metric across all hosts in the cluster. Reads metrics from
- * the precision table and saves into the aggregate.
- */
-public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator {
-  private static final Log LOG = 
LogFactory.getLog(TimelineMetricClusterAggregator.class);
-  private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
-    "timeline-metrics-cluster-aggregator-checkpoint";
-  private final String checkpointLocation;
-  private final Long sleepIntervalMillis;
-  public final int timeSliceIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-  private TimelineMetricReader timelineMetricReader =
-    new TimelineMetricReader(true);
-
-  public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
-                                         Configuration metricsConf) {
-    super(hBaseAccessor, metricsConf);
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    checkpointLocation = FilenameUtils.concat(checkpointDir,
-      CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
-
-    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
-    timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt
-      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
-    checkpointCutOffMultiplier =
-      
metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime)
-    throws SQLException, IOException {
-    List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
-    Map<TimelineClusterMetric, MetricClusterAggregate>
-      aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
-
-    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric 
aggregates.");
-    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_RECORD_TABLE_NAME));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private List<Long[]> getTimeSlices(long startTime, long endTime) {
-    List<Long[]> timeSlices = new ArrayList<Long[]>();
-    long sliceStartTime = startTime;
-    while (sliceStartTime < endTime) {
-      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + 
timeSliceIntervalMillis});
-      sliceStartTime += timeSliceIntervalMillis;
-    }
-    return timeSlices;
-  }
-
-  private Map<TimelineClusterMetric, MetricClusterAggregate>
-  aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
-    throws SQLException, IOException {
-    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics 
=
-      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-    // Create time slices
-
-    while (rs.next()) {
-      TimelineMetric metric =
-        timelineMetricReader.getTimelineMetricFromResultSet(rs);
-
-      Map<TimelineClusterMetric, Double> clusterMetrics =
-        sliceFromTimelineMetric(metric, timeSlices);
-
-      if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
-        for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
-            clusterMetrics.entrySet()) {
-          TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
-          MetricClusterAggregate aggregate = 
aggregateClusterMetrics.get(clusterMetric);
-          Double avgValue = clusterMetricEntry.getValue();
-
-          if (aggregate == null) {
-            aggregate = new MetricClusterAggregate(avgValue, 1, null,
-              avgValue, avgValue);
-            aggregateClusterMetrics.put(clusterMetric, aggregate);
-          } else {
-            aggregate.updateSum(avgValue);
-            aggregate.updateNumberOfHosts(1);
-            aggregate.updateMax(avgValue);
-            aggregate.updateMin(avgValue);
-          }
-        }
-      }
-    }
-    return aggregateClusterMetrics;
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected boolean isDisabled() {
-    return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false);
-  }
-
-  private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
-        TimelineMetric timelineMetric, List<Long[]> timeSlices) {
-
-    if (timelineMetric.getMetricValues().isEmpty()) {
-      return null;
-    }
-
-    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
-      new HashMap<TimelineClusterMetric, Double>();
-
-    for (Map.Entry<Long, Double> metric : 
timelineMetric.getMetricValues().entrySet()) {
-      // TODO: investigate null values - pre filter
-      if (metric.getValue() == null) {
-        continue;
-      }
-      Long timestamp = getSliceTimeForMetric(timeSlices,
-                       Long.parseLong(metric.getKey().toString()));
-      if (timestamp != -1) {
-        // Metric is within desired time range
-        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
-          timelineMetric.getMetricName(),
-          timelineMetric.getAppId(),
-          timelineMetric.getInstanceId(),
-          timestamp,
-          timelineMetric.getType());
-        if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
-          timelineClusterMetricMap.put(clusterMetric, metric.getValue());
-        } else {
-          Double oldValue = timelineClusterMetricMap.get(clusterMetric);
-          Double newValue = (oldValue + metric.getValue()) / 2;
-          timelineClusterMetricMap.put(clusterMetric, newValue);
-        }
-      }
-    }
-
-    return timelineClusterMetricMap;
-  }
-
-  /**
-   * Return beginning of the time slice into which the metric fits.
-   */
-  private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
-    for (Long[] timeSlice : timeSlices) {
-      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
-        return timeSlice[0];
-      }
-    }
-    return -1l;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
deleted file mode 100644
index c76ec60..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricClusterAggregatorHourly extends
-  AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog
-    (TimelineMetricClusterAggregatorHourly.class);
-  private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
-    "timeline-metrics-cluster-aggregator-hourly-checkpoint";
-  private final String checkpointLocation;
-  private final long sleepIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-  private long checkpointCutOffIntervalMillis;
-  private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour
-  private final TimelineClusterMetricReader timelineClusterMetricReader
-     = new TimelineClusterMetricReader(true);
-
-  public TimelineMetricClusterAggregatorHourly(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
-    super(hBaseAccessor, metricsConf);
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    checkpointLocation = FilenameUtils.concat(checkpointDir,
-      CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
-
-    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-    checkpointCutOffIntervalMillis =  SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
-    checkpointCutOffMultiplier = metricsConf.getInt
-      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime)
-    throws SQLException, IOException {
-      Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
-        aggregateMetricsFromResultSet(rs);
-
-    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-    hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
-      METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime,
-                                                  long endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-        METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private Map<TimelineClusterMetric, MetricHostAggregate>
-  aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException 
{
-
-    TimelineClusterMetric existingMetric = null;
-    MetricHostAggregate hostAggregate = null;
-    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
-      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
-
-    while (rs.next()) {
-      TimelineClusterMetric currentMetric =
-        timelineClusterMetricReader.fromResultSet(rs);
-      MetricClusterAggregate currentHostAggregate =
-        getMetricClusterAggregateFromResultSet(rs);
-
-      if (existingMetric == null) {
-        // First row
-        existingMetric = currentMetric;
-        hostAggregate = new MetricHostAggregate();
-        hostAggregateMap.put(currentMetric, hostAggregate);
-      }
-
-      if (existingMetric.equalsExceptTime(currentMetric)) {
-        // Recalculate totals with current metric
-        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-
-      } else {
-        // Switched over to a new metric - save existing
-        hostAggregate = new MetricHostAggregate();
-        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        existingMetric = currentMetric;
-      }
-
-    }
-
-    return hostAggregateMap;
-  }
-
-  private void updateAggregatesFromHost(
-    MetricHostAggregate agg,
-    MetricClusterAggregate currentClusterAggregate) {
-    agg.updateMax(currentClusterAggregate.getMax());
-    agg.updateMin(currentClusterAggregate.getMin());
-    agg.updateSum(currentClusterAggregate.getSum());
-    agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected Long getCheckpointCutOffIntervalMillis() {
-    return checkpointCutOffIntervalMillis;
-  }
-
-  @Override
-  protected boolean isDisabled() {
-    return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false);
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 248894c..b72aa64 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -17,6 +17,7 @@
  */
 package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -128,6 +129,11 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_SERVICE_RPC_ADDRESS =
     "timeline.metrics.service.rpc.address";
 
+  public static final String CLUSTER_AGGREGATOR_APP_IDS =
+    "timeline.metrics.service.cluster.aggregator.appIds";
+
+  public static final String HOST_APP_ID = "HOST";
+
   private Configuration hbaseConf;
   private Configuration metricsConf;
   private volatile boolean isInitialized = false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java
deleted file mode 100644
index aa349f0..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class TimelineMetricReader {
-
-  private boolean ignoreInstance = false;
-
-  public TimelineMetricReader() {}
-
-  public TimelineMetricReader(boolean ignoreInstance) {
-    this.ignoreInstance = ignoreInstance;
-  }
-
-  public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
-    throws SQLException, IOException {
-    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
-    Map<Long, Double> sortedByTimeMetrics = new TreeMap<Long, Double>(
-        PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS")));
-    metric.setMetricValues(sortedByTimeMetrics);
-    return metric;
-  }
-
-  /**
-   * Returns common part of timeline metrics record without the values.
-   */
-  public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs)
-    throws SQLException {
-    TimelineMetric metric = new TimelineMetric();
-    metric.setMetricName(rs.getString("METRIC_NAME"));
-    metric.setAppId(rs.getString("APP_ID"));
-    if (!ignoreInstance) metric.setInstanceId(rs.getString("INSTANCE_ID"));
-    metric.setHostName(rs.getString("HOSTNAME"));
-    metric.setTimestamp(rs.getLong("SERVER_TIME"));
-    metric.setStartTime(rs.getLong("START_TIME"));
-    metric.setType(rs.getString("UNITS"));
-    return metric;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
new file mode 100644
index 0000000..8dea46e
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+
+public abstract class AbstractTimelineAggregator implements Runnable {
+  protected final PhoenixHBaseAccessor hBaseAccessor;
+  private final Log LOG;
+
+  private Clock clock;
+  protected final long checkpointDelayMillis;
+  protected final Integer resultsetFetchSize;
+  protected Configuration metricsConf;
+
+  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                    Configuration metricsConf) {
+    this(hBaseAccessor, metricsConf, new SystemClock());
+  }
+
+  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                    Configuration metricsConf, Clock clk) {
+    this.hBaseAccessor = hBaseAccessor;
+    this.metricsConf = metricsConf;
+    this.checkpointDelayMillis = SECONDS.toMillis(
+      metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
+    this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
+    this.LOG = LogFactory.getLog(this.getClass());
+    this.clock = clk;
+  }
+
+  @Override
+  public void run() {
+    LOG.info("Started Timeline aggregator thread @ " + new Date());
+    Long SLEEP_INTERVAL = getSleepIntervalMillis();
+
+    while (true) {
+      long sleepTime = runOnce(SLEEP_INTERVAL);
+
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        LOG.info("Sleep interrupted, continuing with aggregation.");
+      }
+    }
+  }
+
+  /**
+   * Access relaxed for tests
+   */
+  public long runOnce(Long SLEEP_INTERVAL) {
+    long currentTime = clock.getTime();
+    long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
+    long sleepTime = SLEEP_INTERVAL;
+
+    if (lastCheckPointTime != -1) {
+      LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+        + ((clock.getTime() - lastCheckPointTime) / 1000)
+        + " seconds.");
+
+      long startTime = clock.getTime();
+      boolean success = doWork(lastCheckPointTime,
+        lastCheckPointTime + SLEEP_INTERVAL);
+      long executionTime = clock.getTime() - startTime;
+      long delta = SLEEP_INTERVAL - executionTime;
+
+      if (delta > 0) {
+        // Sleep for (configured sleep - time to execute task)
+        sleepTime = delta;
+      } else {
+        // No sleep because last run took too long to execute
+        LOG.info("Aggregator execution took too long, " +
+          "cancelling sleep. executionTime = " + executionTime);
+        sleepTime = 1;
+      }
+
+      LOG.debug("Aggregator sleep interval = " + sleepTime);
+
+      if (success) {
+        try {
+          // Comment to bug fix:
+          // cannot just save lastCheckPointTime + SLEEP_INTERVAL,
+          // it has to be verified so it is not a time in the future
+          // checkpoint says what was aggregated, and there is no way
+          // the future metrics were aggregated!
+          saveCheckPoint(Math.min(currentTime, lastCheckPointTime +
+            SLEEP_INTERVAL));
+        } catch (IOException io) {
+          LOG.warn("Error saving checkpoint, restarting aggregation at " +
+            "previous checkpoint.");
+        }
+      }
+    }
+
+    return sleepTime;
+  }
+
+  private long readLastCheckpointSavingOnFirstRun(long currentTime) {
+    long lastCheckPointTime = -1;
+
+    try {
+      lastCheckPointTime = readCheckPoint();
+      if (isLastCheckPointTooOld(lastCheckPointTime)) {
+        LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
+          "lastCheckPointTime = " + lastCheckPointTime);
+        lastCheckPointTime = -1;
+      }
+      if (lastCheckPointTime == -1) {
+        // Assuming first run, save checkpoint and sleep.
+        // Set checkpoint to 2 minutes in the past to allow the
+        // agents/collectors to catch up
+        LOG.info("Saving checkpoint time on first run." +
+          (currentTime - checkpointDelayMillis));
+        saveCheckPoint(currentTime - checkpointDelayMillis);
+      }
+    } catch (IOException io) {
+      LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
+    }
+    return lastCheckPointTime;
+  }
+
+  private boolean isLastCheckPointTooOld(long checkpoint) {
+    // first checkpoint is saved checkpointDelayMillis in the past,
+    // so here we also need to take it into account
+    return checkpoint != -1 &&
+      ((clock.getTime() - checkpoint - checkpointDelayMillis) >
+        getCheckpointCutOffIntervalMillis());
+  }
+
+  protected long readCheckPoint() {
+    try {
+      File checkpoint = new File(getCheckpointLocation());
+      if (checkpoint.exists()) {
+        String contents = FileUtils.readFileToString(checkpoint);
+        if (contents != null && !contents.isEmpty()) {
+          return Long.parseLong(contents);
+        }
+      }
+    } catch (IOException io) {
+      LOG.debug(io);
+    }
+    return -1;
+  }
+
+  protected void saveCheckPoint(long checkpointTime) throws IOException {
+    File checkpoint = new File(getCheckpointLocation());
+    if (!checkpoint.exists()) {
+      boolean done = checkpoint.createNewFile();
+      if (!done) {
+        throw new IOException("Could not create checkpoint at location, " +
+          getCheckpointLocation());
+      }
+    }
+    FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
+  }
+
+  /**
+   * Read metrics written during the time interval and save the sum and total
+   * in the aggregate table.
+   *
+   * @param startTime Sample start time
+   * @param endTime Sample end time
+   */
+  public boolean doWork(long startTime, long endTime) {
+    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+      "startTime = " + new Date(startTime) + ", endTime = " + new 
Date(endTime));
+
+    boolean success = true;
+    Condition condition = prepareMetricQueryCondition(startTime, endTime);
+
+    Connection conn = null;
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+
+    try {
+      conn = hBaseAccessor.getConnection();
+      // FLUME 2. aggregate and ignore the instance
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+      LOG.debug("Query issued @: " + new Date());
+      rs = stmt.executeQuery();
+      LOG.debug("Query returned @: " + new Date());
+
+      aggregate(rs, startTime, endTime);
+      LOG.info("End aggregation cycle @ " + new Date());
+
+    } catch (SQLException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } catch (IOException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    LOG.info("End aggregation cycle @ " + new Date());
+    return success;
+  }
+
+  protected abstract Condition prepareMetricQueryCondition(long startTime, 
long endTime);
+
+  protected abstract void aggregate(ResultSet rs, long startTime, long endTime)
+    throws IOException, SQLException;
+
+  protected abstract Long getSleepIntervalMillis();
+
+  protected abstract Integer getCheckpointCutOffMultiplier();
+
+  protected Long getCheckpointCutOffIntervalMillis() {
+    return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
+  }
+
+  public abstract boolean isDisabled();
+
+  protected abstract String getCheckpointLocation();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
new file mode 100644
index 0000000..ce79b6f
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class AggregatorUtils {
+
+  public static double[] calculateAggregates(Map<Long, Double> metricValues) {
+    double[] values = new double[4];
+    double max = Double.MIN_VALUE;
+    double min = Double.MAX_VALUE;
+    double sum = 0.0;
+    int metricCount = 0;
+
+    if (metricValues != null && !metricValues.isEmpty()) {
+      for (Double value : metricValues.values()) {
+        // TODO: Some nulls in data - need to investigate null values from host
+        if (value != null) {
+          if (value > max) {
+            max = value;
+          }
+          if (value < min) {
+            min = value;
+          }
+          sum += value;
+        }
+      }
+      metricCount = metricValues.values().size();
+    }
+    // BR: WHY ZERO is a good idea?
+    values[0] = sum;
+    values[1] = max != Double.MIN_VALUE ? max : 0.0;
+    values[2] = min != Double.MAX_VALUE ? min : 0.0;
+    values[3] = metricCount;
+
+    return values;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
new file mode 100644
index 0000000..9d17fca
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+/**
+ * Is used to determine metrics aggregate table.
+ *
+ * @see 
org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetric
+ * @see 
org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetrics
+ */
+public class Function {
+  public static Function DEFAULT_VALUE_FUNCTION =
+    new Function(ReadFunction.VALUE, null);
+  private static final String SUFFIX_SEPARATOR = "\\._";
+
+  private ReadFunction readFunction = ReadFunction.VALUE;
+  private PostProcessingFunction postProcessingFunction = null;
+
+  public Function(){
+
+  }
+
+  public Function(ReadFunction readFunction,
+                  PostProcessingFunction ppFunction){
+    if (readFunction!=null){
+      this.readFunction = readFunction ;
+    }
+    this.postProcessingFunction = ppFunction;
+  }
+
+  public static Function fromMetricName(String metricName){
+    // gets postprocessing, and aggregation function
+    // ex. Metric._rate._avg
+    String[] parts = metricName.split(SUFFIX_SEPARATOR);
+
+    ReadFunction readFunction = ReadFunction.VALUE;
+    PostProcessingFunction ppFunction = null;
+
+      if (parts.length == 3) {
+        ppFunction = PostProcessingFunction.getFunction(parts[1]);
+        readFunction = ReadFunction.getFunction(parts[2]);
+      } else if (parts.length == 2) {
+        ppFunction = null;
+        readFunction = ReadFunction.getFunction(parts[1]);
+      }
+
+
+    return new Function(readFunction, ppFunction);
+  }
+
+  public String getSuffix(){
+    return (postProcessingFunction == null)? readFunction.getSuffix() :
+      postProcessingFunction.getSuffix() + readFunction.getSuffix();
+  }
+
+  public ReadFunction getReadFunction() {
+    return readFunction;
+  }
+
+  @Override
+  public String toString() {
+    return "Function{" +
+      "readFunction=" + readFunction +
+      ", postProcessingFunction=" + postProcessingFunction +
+      '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof Function)) return false;
+
+    Function function = (Function) o;
+
+    return postProcessingFunction == function.postProcessingFunction
+      && readFunction == function.readFunction;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = readFunction.hashCode();
+    result = 31 * result + (postProcessingFunction != null ?
+      postProcessingFunction.hashCode() : 0);
+    return result;
+  }
+
+  public enum PostProcessingFunction {
+    NONE(""),
+    RATE("._rate");
+
+    PostProcessingFunction(String suffix){
+      this.suffix = suffix;
+    }
+
+    private String suffix = "";
+
+    public String getSuffix(){
+      return suffix;
+    }
+
+    public static PostProcessingFunction getFunction(String functionName) 
throws
+      FunctionFormatException {
+      if (functionName == null) {
+        return NONE;
+      }
+
+      try {
+        return PostProcessingFunction.valueOf(functionName.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        throw new FunctionFormatException("Function should be value, avg, min, 
" +
+          "max", e);
+      }
+    }
+  }
+
+  public enum ReadFunction {
+    VALUE(""),
+    AVG("._avg"),
+    MIN("._min"),
+    MAX("._max"),
+    SUM("._sum");
+
+    private final String suffix;
+
+    ReadFunction(String suffix){
+      this.suffix = suffix;
+    }
+
+    public String getSuffix() {
+      return suffix;
+    }
+
+    public static ReadFunction getFunction(String functionName) throws
+      FunctionFormatException {
+      if (functionName == null) {
+        return VALUE;
+      }
+      try {
+        return ReadFunction.valueOf(functionName.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        throw new FunctionFormatException(
+          "Function should be value, avg, min, max. Got " + functionName, e);
+      }
+    }
+  }
+
+  public static class FunctionFormatException extends IllegalArgumentException 
{
+    public FunctionFormatException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
new file mode 100644
index 0000000..825ac25
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+  @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  protected Double sum = 0.0;
+  protected Double deviation;
+  protected Double max = Double.MIN_VALUE;
+  protected Double min = Double.MAX_VALUE;
+
+  public MetricAggregate() {
+  }
+
+  MetricAggregate(Double sum, Double deviation, Double max,
+                  Double min) {
+    this.sum = sum;
+    this.deviation = deviation;
+    this.max = max;
+    this.min = min;
+  }
+
+  public void updateSum(Double sum) {
+    this.sum += sum;
+  }
+
+  public void updateMax(Double max) {
+    if (max > this.max) {
+      this.max = max;
+    }
+  }
+
+  public void updateMin(Double min) {
+    if (min < this.min) {
+      this.min = min;
+    }
+  }
+
+  @JsonProperty("sum")
+  public Double getSum() {
+    return sum;
+  }
+
+  @JsonProperty("deviation")
+  public Double getDeviation() {
+    return deviation;
+  }
+
+  @JsonProperty("max")
+  public Double getMax() {
+    return max;
+  }
+
+  @JsonProperty("min")
+  public Double getMin() {
+    return min;
+  }
+
+  public void setSum(Double sum) {
+    this.sum = sum;
+  }
+
+  public void setDeviation(Double deviation) {
+    this.deviation = deviation;
+  }
+
+  public void setMax(Double max) {
+    this.max = max;
+  }
+
+  public void setMin(Double min) {
+    this.min = min;
+  }
+
+  public String toJSON() throws IOException {
+    return mapper.writeValueAsString(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
new file mode 100644
index 0000000..9c837b6
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+  private int numberOfHosts;
+
+  @JsonCreator
+  public MetricClusterAggregate() {
+  }
+
+  public MetricClusterAggregate(Double sum, int numberOfHosts, Double 
deviation,
+                         Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  @JsonProperty("numberOfHosts")
+  public int getNumberOfHosts() {
+    return numberOfHosts;
+  }
+
+  public void updateNumberOfHosts(int count) {
+    this.numberOfHosts += count;
+  }
+
+  public void setNumberOfHosts(int numberOfHosts) {
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  public void updateAggregates(MetricClusterAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+  }
+
+  @Override
+  public String toString() {
+    return "MetricAggregate{" +
+      "sum=" + sum +
+      ", numberOfHosts=" + numberOfHosts +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
new file mode 100644
index 0000000..340ec75
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+  private long numberOfSamples = 0;
+
+  @JsonCreator
+  public MetricHostAggregate() {
+    super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+  }
+
+  public MetricHostAggregate(Double sum, int numberOfSamples,
+                             Double deviation,
+                             Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  @JsonProperty("numberOfSamples")
+  public long getNumberOfSamples() {
+    return numberOfSamples == 0 ? 1 : numberOfSamples;
+  }
+
+  public void updateNumberOfSamples(long count) {
+    this.numberOfSamples += count;
+  }
+
+  public void setNumberOfSamples(long numberOfSamples) {
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  public double getAvg() {
+    return sum / numberOfSamples;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  public void updateAggregates(MetricHostAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+  }
+
+  @Override
+  public String toString() {
+    return "MetricHostAggregate{" +
+      "sum=" + sum +
+      ", numberOfSamples=" + numberOfSamples +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
new file mode 100644
index 0000000..3c30a6f
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+public class TimelineClusterMetric {
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private long timestamp;
+  private String type;
+
+  public TimelineClusterMetric(String metricName, String appId, String 
instanceId,
+                        long timestamp, String type) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.timestamp = timestamp;
+    this.type = type;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public String getType() { return type; }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+    if (timestamp != that.timestamp) return false;
+    if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(that.instanceId) : 
that.instanceId != null)
+      return false;
+    if (!metricName.equals(that.metricName)) return false;
+
+    return true;
+  }
+
+  public boolean equalsExceptTime(TimelineClusterMetric metric) {
+    if (!metricName.equals(metric.metricName)) return false;
+    if (!appId.equals(metric.appId)) return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : 
metric.instanceId != null)
+      return false;
+
+    return true;
+  }
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + (appId != null ? appId.hashCode() : 0);
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "TimelineClusterMetric{" +
+      "metricName='" + metricName + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", timestamp=" + timestamp +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
new file mode 100644
index 0000000..3df88d2
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class TimelineClusterMetricReader {
+
+  private boolean ignoreInstance;
+
+  public TimelineClusterMetricReader(boolean ignoreInstance) {
+    this.ignoreInstance = ignoreInstance;
+  }
+
+  public TimelineClusterMetric fromResultSet(ResultSet rs)
+    throws SQLException {
+
+    return new TimelineClusterMetric(
+      rs.getString("METRIC_NAME"),
+      rs.getString("APP_ID"),
+      ignoreInstance ? null : rs.getString("INSTANCE_ID"),
+      rs.getLong("SERVER_TIME"),
+      rs.getString("UNITS"));
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
new file mode 100644
index 0000000..a2887ea
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
+public class TimelineMetricAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog
+    (TimelineMetricAggregator.class);
+
+  private final String checkpointLocation;
+  private final Long sleepIntervalMillis;
+  private final Integer checkpointCutOffMultiplier;
+  private final String hostAggregatorDisabledParam;
+  private final String tableName;
+  private final String outputTableName;
+  private final Long nativeTimeRangeDelay;
+
+  public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                  Configuration metricsConf,
+                                  String checkpointLocation,
+                                  Long sleepIntervalMillis,
+                                  Integer checkpointCutOffMultiplier,
+                                  String hostAggregatorDisabledParam,
+                                  String tableName,
+                                  String outputTableName,
+                                  Long nativeTimeRangeDelay) {
+    super(hBaseAccessor, metricsConf);
+    this.checkpointLocation = checkpointLocation;
+    this.sleepIntervalMillis = sleepIntervalMillis;
+    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+    this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
+    this.tableName = tableName;
+    this.outputTableName = outputTableName;
+    this.nativeTimeRangeDelay =  nativeTimeRangeDelay;
+  }
+
+  @Override
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws IOException, SQLException {
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      aggregateMetricsFromResultSet(rs);
+
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+      outputTableName);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime, null, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
nativeTimeRangeDelay),
+      tableName));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("HOSTNAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private Map<TimelineMetric, MetricHostAggregate> 
aggregateMetricsFromResultSet
+      (ResultSet rs) throws IOException, SQLException {
+    TimelineMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      new HashMap<TimelineMetric, MetricHostAggregate>();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+      }
+
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        hostAggregate.updateAggregates(currentHostAggregate);
+      } else {
+        // Switched over to a new metric - save existing - create new aggregate
+        hostAggregate = new MetricHostAggregate();
+        hostAggregate.updateAggregates(currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
+      }
+    }
+    return hostAggregateMap;
+  }
+
+  @Override
+  protected Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
+
+  @Override
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
+
+  @Override
+  public boolean isDisabled() {
+    return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
+  }
+}

Reply via email to