http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
deleted file mode 100644
index 02cc207..0000000
--- 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Represents a collection of minute based aggregation of values for
- * resolution greater than a minute.
- */
-public class MetricHostAggregate extends MetricAggregate {
-
-  private long numberOfSamples = 0;
-
-  @JsonCreator
-  public MetricHostAggregate() {
-    super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
-  }
-
-  public MetricHostAggregate(Double sum, int numberOfSamples,
-                             Double deviation,
-                             Double max, Double min) {
-    super(sum, deviation, max, min);
-    this.numberOfSamples = numberOfSamples;
-  }
-
-  @JsonProperty("numberOfSamples")
-  long getNumberOfSamples() {
-    return numberOfSamples == 0 ? 1 : numberOfSamples;
-  }
-
-  void updateNumberOfSamples(long count) {
-    this.numberOfSamples += count;
-  }
-
-  public void setNumberOfSamples(long numberOfSamples) {
-    this.numberOfSamples = numberOfSamples;
-  }
-
-  public double getAvg() {
-    return sum / numberOfSamples;
-  }
-
-  /**
-   * Find and update min, max and avg for a minute
-   */
-  void updateAggregates(MetricHostAggregate hostAggregate) {
-    updateMax(hostAggregate.getMax());
-    updateMin(hostAggregate.getMin());
-    updateSum(hostAggregate.getSum());
-    updateNumberOfSamples(hostAggregate.getNumberOfSamples());
-  }
-
-  @Override
-  public String toString() {
-    return "MetricHostAggregate{" +
-      "sum=" + sum +
-      ", numberOfSamples=" + numberOfSamples +
-      ", deviation=" + deviation +
-      ", max=" + max +
-      ", min=" + min +
-      '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
deleted file mode 100644
index 88a427a..0000000
--- 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline;
-
-/**
- * RuntimeException for initialization of metrics schema. It is 
RuntimeException
- * since this is a not recoverable situation, and should be handled by main or
- * service method followed by shutdown.
- */
-public class MetricsInitializationException extends RuntimeException {
-  public MetricsInitializationException() {
-  }
-
-  public MetricsInitializationException(String msg) {
-    super(msg);
-  }
-
-  public MetricsInitializationException(Throwable t) {
-    super(t);
-  }
-
-  public MetricsInitializationException(String msg, Throwable t) {
-    super(msg, t);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
deleted file mode 100644
index 4f248b7..0000000
--- 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ /dev/null
@@ -1,678 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
-
-/**
- * Provides a facade over the Phoenix API to access HBase schema
- */
-public class PhoenixHBaseAccessor {
-
-  private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
-  private final Configuration hbaseConf;
-  private final Configuration metricsConf;
-  private final RetryCounterFactory retryCounterFactory;
-
-  static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
-  /**
-   * 4 metrics/min * 60 * 24: Retrieve data for 1 day.
-   */
-  private static final int METRICS_PER_MINUTE = 4;
-  public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) *
-    METRICS_PER_MINUTE;
-  private static ObjectMapper mapper = new ObjectMapper();
-
-  private static TypeReference<Map<Long, Double>> metricValuesTypeRef =
-    new TypeReference<Map<Long, Double>>() {};
-  private final ConnectionProvider dataSource;
-
-  public PhoenixHBaseAccessor(Configuration hbaseConf,
-                              Configuration metricsConf){
-    this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
-  }
-
-  public PhoenixHBaseAccessor(Configuration hbaseConf,
-                              Configuration metricsConf,
-                              ConnectionProvider dataSource) {
-    this.hbaseConf = hbaseConf;
-    this.metricsConf = metricsConf;
-    RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, 5760);
-    try {
-      Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
-    } catch (ClassNotFoundException e) {
-      LOG.error("Phoenix client jar not found in the classpath.", e);
-      throw new IllegalStateException(e);
-    }
-    this.dataSource = dataSource;
-    this.retryCounterFactory = new RetryCounterFactory(
-      metricsConf.getInt(GLOBAL_MAX_RETRIES, 10),
-      (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5)));
-  }
-
-
-  private Connection getConnectionRetryingOnException()
-    throws SQLException, InterruptedException {
-    RetryCounter retryCounter = retryCounterFactory.create();
-    while (true) {
-      try{
-        return getConnection();
-      } catch (SQLException e) {
-        if(!retryCounter.shouldRetry()){
-          LOG.error("HBaseAccessor getConnection failed after "
-            + retryCounter.getMaxAttempts() + " attempts");
-          throw e;
-        }
-      }
-      retryCounter.sleepUntilNextRetry();
-    }
-  }
-
-
-  /**
-   * Get JDBC connection to HBase store. Assumption is that the hbase
-   * configuration is present on the classpath and loaded by the caller into
-   * the Configuration object.
-   * Phoenix already caches the HConnection between the client and HBase
-   * cluster.
-   *
-   * @return @java.sql.Connection
-   */
-  public Connection getConnection() throws SQLException {
-    return dataSource.getConnection();
-  }
-
-  public static Map readMetricFromJSON(String json) throws IOException {
-    return mapper.readValue(json, metricValuesTypeRef);
-  }
-
-  @SuppressWarnings("unchecked")
-  static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
-    throws SQLException, IOException {
-    TimelineMetric metric = new TimelineMetric();
-    metric.setMetricName(rs.getString("METRIC_NAME"));
-    metric.setAppId(rs.getString("APP_ID"));
-    metric.setInstanceId(rs.getString("INSTANCE_ID"));
-    metric.setHostName(rs.getString("HOSTNAME"));
-    metric.setTimestamp(rs.getLong("SERVER_TIME"));
-    metric.setStartTime(rs.getLong("START_TIME"));
-    metric.setType(rs.getString("UNITS"));
-    metric.setMetricValues(
-      (Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS")));
-    return metric;
-  }
-
-  static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
-    throws SQLException, IOException {
-    TimelineMetric metric = new TimelineMetric();
-    metric.setMetricName(rs.getString("METRIC_NAME"));
-    metric.setAppId(rs.getString("APP_ID"));
-    metric.setInstanceId(rs.getString("INSTANCE_ID"));
-    metric.setHostName(rs.getString("HOSTNAME"));
-    metric.setTimestamp(rs.getLong("SERVER_TIME"));
-    metric.setType(rs.getString("UNITS"));
-    return metric;
-  }
-
-  static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
-    throws SQLException {
-    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
-    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
-    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
-    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
-    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
-
-    metricHostAggregate.setDeviation(0.0);
-    return metricHostAggregate;
-  }
-
-  static TimelineClusterMetric
-  getTimelineMetricClusterKeyFromResultSet(ResultSet rs)
-    throws SQLException, IOException {
-    TimelineClusterMetric metric = new TimelineClusterMetric(
-      rs.getString("METRIC_NAME"),
-      rs.getString("APP_ID"),
-      rs.getString("INSTANCE_ID"),
-      rs.getLong("SERVER_TIME"),
-      rs.getString("UNITS"));
-
-    return metric;
-  }
-
-  static MetricClusterAggregate
-  getMetricClusterAggregateFromResultSet(ResultSet rs)
-    throws SQLException {
-    MetricClusterAggregate agg = new MetricClusterAggregate();
-    agg.setSum(rs.getDouble("METRIC_SUM"));
-    agg.setMax(rs.getDouble("METRIC_MAX"));
-    agg.setMin(rs.getDouble("METRIC_MIN"));
-    agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
-
-    agg.setDeviation(0.0);
-
-    return agg;
-  }
-
-  protected void initMetricSchema() {
-    Connection conn = null;
-    Statement stmt = null;
-
-    String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
-    String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, 
DEFAULT_TABLE_COMPRESSION);
-    String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");
-    String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");
-    String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");
-    String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, 
"2592000");
-    String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, 
"31536000");
-
-    try {
-      LOG.info("Initializing metrics schema...");
-      conn = getConnectionRetryingOnException();
-      stmt = conn.createStatement();
-
-      stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL,
-        encoding, precisionTtl, compression));
-      
stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL,
-        encoding, hostHourTtl, compression));
-      
stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL,
-        encoding, hostMinTtl, compression));
-      
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
-        encoding, clusterMinTtl, compression));
-      
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
-        encoding, clusterHourTtl, compression));
-      conn.commit();
-    } catch (SQLException sql) {
-      LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql);
-      throw new MetricsInitializationException(
-        "Error creating Metrics Schema in HBase using Phoenix.", sql);
-    } catch (InterruptedException e) {
-      LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", e);
-      throw new MetricsInitializationException(
-        "Error creating Metrics Schema in HBase using Phoenix.", e);
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-    }
-  }
-
-  public void insertMetricRecords(TimelineMetrics metrics)
-    throws SQLException, IOException {
-
-    List<TimelineMetric> timelineMetrics = metrics.getMetrics();
-    if (timelineMetrics == null || timelineMetrics.isEmpty()) {
-      LOG.debug("Empty metrics insert request.");
-      return;
-    }
-
-    Connection conn = getConnection();
-    PreparedStatement metricRecordStmt = null;
-    long currentTime = System.currentTimeMillis();
-
-    try {
-      metricRecordStmt = conn.prepareStatement(String.format(
-        UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
-
-      for (TimelineMetric metric : timelineMetrics) {
-        metricRecordStmt.clearParameters();
-
-        LOG.trace("host: " + metric.getHostName() + ", " +
-          "metricName = " + metric.getMetricName() + ", " +
-          "values: " + metric.getMetricValues());
-        Aggregator agg = new Aggregator();
-        double[] aggregates =  agg.calculateAggregates(
-          metric.getMetricValues());
-
-        metricRecordStmt.setString(1, metric.getMetricName());
-        metricRecordStmt.setString(2, metric.getHostName());
-        metricRecordStmt.setString(3, metric.getAppId());
-        metricRecordStmt.setString(4, metric.getInstanceId());
-        metricRecordStmt.setLong(5, currentTime);
-        metricRecordStmt.setLong(6, metric.getStartTime());
-        metricRecordStmt.setString(7, metric.getType());
-        metricRecordStmt.setDouble(8, aggregates[0]);
-        metricRecordStmt.setDouble(9, aggregates[1]);
-        metricRecordStmt.setDouble(10, aggregates[2]);
-        metricRecordStmt.setLong(11, (long)aggregates[3]);
-        String json =
-          TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
-        metricRecordStmt.setString(12, json);
-
-        try {
-          metricRecordStmt.executeUpdate();
-        } catch (SQLException sql) {
-          LOG.error(sql);
-        }
-      }
-
-      conn.commit();
-
-    } finally {
-      if (metricRecordStmt != null) {
-        try {
-          metricRecordStmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-  }
-
-
-  @SuppressWarnings("unchecked")
-  public TimelineMetrics getMetricRecords(final Condition condition)
-    throws SQLException, IOException {
-
-    if (condition.isEmpty()) {
-      throw new SQLException("No filter criteria specified.");
-    }
-
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    TimelineMetrics metrics = new TimelineMetrics();
-
-    try {
-      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-
-      ResultSet rs = stmt.executeQuery();
-
-      while (rs.next()) {
-        TimelineMetric metric = getTimelineMetricFromResultSet(rs);
-
-        if (condition.isGrouped()) {
-          metrics.addOrMergeTimelineMetric(metric);
-        } else {
-          metrics.getMetrics().add(metric);
-        }
-      }
-
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-    return metrics;
-  }
-
-  public void saveHostAggregateRecords(Map<TimelineMetric,
-    MetricHostAggregate> hostAggregateMap, String phoenixTableName)
-    throws SQLException {
-
-    if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
-      Connection conn = getConnection();
-      PreparedStatement stmt = null;
-
-      long start = System.currentTimeMillis();
-      int rowCount = 0;
-
-      try {
-        stmt = conn.prepareStatement(
-          String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
-
-        for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
-          hostAggregateMap.entrySet()) {
-
-          TimelineMetric metric = metricAggregate.getKey();
-          MetricHostAggregate hostAggregate = metricAggregate.getValue();
-
-          rowCount++;
-          stmt.clearParameters();
-          stmt.setString(1, metric.getMetricName());
-          stmt.setString(2, metric.getHostName());
-          stmt.setString(3, metric.getAppId());
-          stmt.setString(4, metric.getInstanceId());
-          stmt.setLong(5, metric.getTimestamp());
-          stmt.setString(6, metric.getType());
-          stmt.setDouble(7, hostAggregate.getSum());
-          stmt.setDouble(8, hostAggregate.getMax());
-          stmt.setDouble(9, hostAggregate.getMin());
-          stmt.setDouble(10, hostAggregate.getNumberOfSamples());
-
-          try {
-            // TODO: Why this exception is swallowed
-            stmt.executeUpdate();
-          } catch (SQLException sql) {
-            LOG.error(sql);
-          }
-
-          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
-            conn.commit();
-            rowCount = 0;
-          }
-
-        }
-
-        conn.commit();
-
-      } finally {
-        if (stmt != null) {
-          try {
-            stmt.close();
-          } catch (SQLException e) {
-            // Ignore
-          }
-        }
-        if (conn != null) {
-          try {
-            conn.close();
-          } catch (SQLException sql) {
-            // Ignore
-          }
-        }
-      }
-
-      long end = System.currentTimeMillis();
-
-      if ((end - start) > 60000l) {
-        LOG.info("Time to save map: " + (end - start) + ", " +
-          "thread = " + Thread.currentThread().getClass());
-      }
-    }
-  }
-
-  /**
-   * Save Metric aggregate records.
-   *
-   * @throws SQLException
-   */
-  public void saveClusterAggregateRecords(
-    Map<TimelineClusterMetric, MetricClusterAggregate> records)
-    throws SQLException {
-
-      if (records == null || records.isEmpty()) {
-        LOG.debug("Empty aggregate records.");
-        return;
-      }
-
-      long start = System.currentTimeMillis();
-
-      Connection conn = getConnection();
-      PreparedStatement stmt = null;
-      try {
-        stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
-        int rowCount = 0;
-
-        for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
-          aggregateEntry : records.entrySet()) {
-          TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
-          MetricClusterAggregate aggregate = aggregateEntry.getValue();
-
-          LOG.trace("clusterMetric = " + clusterMetric + ", " +
-            "aggregate = " + aggregate);
-
-          rowCount++;
-          stmt.clearParameters();
-          stmt.setString(1, clusterMetric.getMetricName());
-          stmt.setString(2, clusterMetric.getAppId());
-          stmt.setString(3, clusterMetric.getInstanceId());
-          stmt.setLong(4, clusterMetric.getTimestamp());
-          stmt.setString(5, clusterMetric.getType());
-          stmt.setDouble(6, aggregate.getSum());
-          stmt.setInt(7, aggregate.getNumberOfHosts());
-          stmt.setDouble(8, aggregate.getMax());
-          stmt.setDouble(9, aggregate.getMin());
-
-          try {
-            stmt.executeUpdate();
-          } catch (SQLException sql) {
-            // TODO: Why this exception is swallowed
-            LOG.error(sql);
-          }
-
-          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
-            conn.commit();
-            rowCount = 0;
-          }
-        }
-
-        conn.commit();
-
-      } finally {
-        if (stmt != null) {
-          try {
-            stmt.close();
-          } catch (SQLException e) {
-            // Ignore
-          }
-        }
-        if (conn != null) {
-          try {
-            conn.close();
-          } catch (SQLException sql) {
-            // Ignore
-          }
-        }
-      }
-      long end = System.currentTimeMillis();
-      if ((end - start) > 60000l) {
-        LOG.info("Time to save: " + (end - start) + ", " +
-          "thread = " + Thread.currentThread().getName());
-      }
-    }
-
-  /**
-   * Save Metric aggregate records.
-   *
-   * @throws SQLException
-   */
-  public void saveClusterAggregateHourlyRecords(
-    Map<TimelineClusterMetric, MetricHostAggregate> records,
-    String tableName)
-    throws SQLException {
-    if (records == null || records.isEmpty()) {
-      LOG.debug("Empty aggregate records.");
-      return;
-    }
-
-    long start = System.currentTimeMillis();
-
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(String.format
-        (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
-      int rowCount = 0;
-
-      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
-        aggregateEntry : records.entrySet()) {
-        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
-        MetricHostAggregate aggregate = aggregateEntry.getValue();
-
-        LOG.trace("clusterMetric = " + clusterMetric + ", " +
-          "aggregate = " + aggregate);
-
-        rowCount++;
-        stmt.clearParameters();
-        stmt.setString(1, clusterMetric.getMetricName());
-        stmt.setString(2, clusterMetric.getAppId());
-        stmt.setString(3, clusterMetric.getInstanceId());
-        stmt.setLong(4, clusterMetric.getTimestamp());
-        stmt.setString(5, clusterMetric.getType());
-        stmt.setDouble(6, aggregate.getSum());
-//        stmt.setInt(7, aggregate.getNumberOfHosts());
-        stmt.setLong(7, aggregate.getNumberOfSamples());
-        stmt.setDouble(8, aggregate.getMax());
-        stmt.setDouble(9, aggregate.getMin());
-
-        try {
-          stmt.executeUpdate();
-        } catch (SQLException sql) {
-          // we have no way to verify it works!!!
-          LOG.error(sql);
-        }
-
-        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
-          conn.commit();
-          rowCount = 0;
-        }
-      }
-
-      conn.commit();
-
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-    long end = System.currentTimeMillis();
-    if ((end - start) > 60000l) {
-      LOG.info("Time to save: " + (end - start) + ", " +
-        "thread = " + Thread.currentThread().getName());
-    }
-  }
-
-
-  public TimelineMetrics getAggregateMetricRecords(final Condition condition)
-    throws SQLException {
-
-    if (condition.isEmpty()) {
-      throw new SQLException("No filter criteria specified.");
-    }
-
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    TimelineMetrics metrics = new TimelineMetrics();
-
-    try {
-      stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
-
-      ResultSet rs = stmt.executeQuery();
-
-      while (rs.next()) {
-        TimelineMetric metric = new TimelineMetric();
-        metric.setMetricName(rs.getString("METRIC_NAME"));
-        metric.setAppId(rs.getString("APP_ID"));
-        metric.setInstanceId(rs.getString("INSTANCE_ID"));
-        metric.setTimestamp(rs.getLong("SERVER_TIME"));
-        metric.setStartTime(rs.getLong("SERVER_TIME"));
-        Map<Long, Double> valueMap = new HashMap<Long, Double>();
-        valueMap.put(rs.getLong("SERVER_TIME"),
-          rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
-        metric.setMetricValues(valueMap);
-
-        if (condition.isGrouped()) {
-          metrics.addOrMergeTimelineMetric(metric);
-        } else {
-          metrics.getMetrics().add(metric);
-        }
-      }
-
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-    LOG.info("Aggregate records size: " + metrics.getMetrics().size());
-    return metrics;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
deleted file mode 100644
index 0d53f5f..0000000
--- 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
+++ /dev/null
@@ -1,528 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Encapsulate all metrics related SQL queries.
- */
-public class PhoenixTransactSQL {
-
-  static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
-  // TODO: Configurable TTL values
-  /**
-   * Create table to store individual metric records.
-   */
-  public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " 
+
-    "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " +
-    "HOSTNAME VARCHAR, " +
-    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-    "APP_ID VARCHAR, " +
-    "INSTANCE_ID VARCHAR, " +
-    "START_TIME UNSIGNED_LONG, " +
-    "UNITS CHAR(20), " +
-    "METRIC_SUM DOUBLE, " +
-    "METRIC_COUNT UNSIGNED_INT, " +
-    "METRIC_MAX DOUBLE, " +
-    "METRIC_MIN DOUBLE, " +
-    "METRICS VARCHAR CONSTRAINT pk " +
-    "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " +
-    "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-    "TTL=%s, COMPRESSION='%s'";
-
-  public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
-      "(METRIC_NAME VARCHAR, " +
-      "HOSTNAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE," +
-      "METRIC_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE," +
-      "METRIC_MIN DOUBLE CONSTRAINT pk " +
-      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-      "TTL=%s, COMPRESSION='%s'";
-
-  public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
-      "(METRIC_NAME VARCHAR, " +
-      "HOSTNAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE," +
-      "METRIC_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE," +
-      "METRIC_MIN DOUBLE CONSTRAINT pk " +
-      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
-      " COMPRESSION='%s'";
-
-  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
-      "(METRIC_NAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE, " +
-      "HOSTS_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE, " +
-      "METRIC_MIN DOUBLE " +
-      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-      "TTL=%s, COMPRESSION='%s'";
-
-  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL 
=
-    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
-      "(METRIC_NAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE, " +
-      "METRIC_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE, " +
-      "METRIC_MIN DOUBLE " +
-      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-      "TTL=%s, COMPRESSION='%s'";
-
-  /**
-   * Insert into metric records table.
-   */
-  public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
-    "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " +
-    "UNITS, " +
-    "METRIC_SUM, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN, " +
-    "METRIC_COUNT, " +
-    "METRICS) VALUES " +
-    "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
-  public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
-    "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
-    "UNITS, " +
-    "METRIC_SUM, " +
-    "HOSTS_COUNT, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN) " +
-    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
-  public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" 
+
-    " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
-    "UNITS, " +
-    "METRIC_SUM, " +
-    "METRIC_COUNT, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN) " +
-    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
-
-  public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
-    "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
-    "SERVER_TIME, " +
-    "UNITS, " +
-    "METRIC_SUM, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN," +
-    "METRIC_COUNT) " +
-    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
-  /**
-   * Retrieve a set of rows from metrics records table.
-   */
-  public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " +
-    "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " +
-    "METRIC_SUM, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN, " +
-    "METRIC_COUNT, " +
-    "METRICS " +
-    "FROM %s";
-
-  public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " +
-    "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
-    "UNITS, " +
-    "METRIC_SUM, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN, " +
-    "METRIC_COUNT " +
-    "FROM %s";
-
-  public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " +
-    "METRIC_NAME, APP_ID, " +
-    "INSTANCE_ID, SERVER_TIME, " +
-    "UNITS, " +
-    "METRIC_SUM, " +
-    "HOSTS_COUNT, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN " +
-    "FROM METRIC_AGGREGATE";
-
-  public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
-  public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
-    "METRIC_RECORD_MINUTE";
-  public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
-    "METRIC_RECORD_HOURLY";
-  public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
-    "METRIC_AGGREGATE";
-  public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
-    "METRIC_AGGREGATE_HOURLY";
-  public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
-  public static final String DEFAULT_ENCODING = "FAST_DIFF";
-  public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
-
-  /** Filter to optimize HBase scan by using file timestamps. This prevents
-   * a full table scan of metric records.
-   * @return Phoenix Hint String
-   */
-  public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
-    return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
-  }
-
-  public static PreparedStatement prepareGetMetricsSqlStmt(
-    Connection connection, Condition condition) throws SQLException {
-
-    if (condition.isEmpty()) {
-      throw new IllegalArgumentException("Condition is empty.");
-    }
-    String stmtStr;
-    if (condition.getStatement() != null) {
-      stmtStr = condition.getStatement();
-    } else {
-      stmtStr = String.format(GET_METRIC_SQL,
-        getNaiveTimeRangeHint(condition.getStartTime(), 
NATIVE_TIME_RANGE_DELTA),
-        METRICS_RECORD_TABLE_NAME);
-    }
-
-    StringBuilder sb = new StringBuilder(stmtStr);
-    sb.append(" WHERE ");
-    sb.append(condition.getConditionClause());
-    String orderByClause = condition.getOrderByClause();
-
-    if (orderByClause != null) {
-      sb.append(orderByClause);
-    } else {
-      sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
-    }
-    if (condition.getLimit() != null) {
-      sb.append(" LIMIT ").append(condition.getLimit());
-    }
-
-    LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
-    PreparedStatement stmt = connection.prepareStatement(sb.toString());
-    int pos = 1;
-    if (condition.getMetricNames() != null) {
-      for (; pos <= condition.getMetricNames().size(); pos++) {
-        LOG.debug("Setting pos: " + pos + ", value = " + 
condition.getMetricNames().get(pos - 1));
-        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
-      }
-    }
-    if (condition.getHostname() != null) {
-      LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
-      stmt.setString(pos++, condition.getHostname());
-    }
-    // TODO: Upper case all strings on POST
-    if (condition.getAppId() != null) {
-      // TODO: fix case of appId coming from host metrics
-      String appId = condition.getAppId();
-      if (!condition.getAppId().equals("HOST")) {
-        appId = appId.toLowerCase();
-      }
-      LOG.debug("Setting pos: " + pos + ", value: " + appId);
-      stmt.setString(pos++, appId);
-    }
-    if (condition.getInstanceId() != null) {
-      LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getInstanceId());
-      stmt.setString(pos++, condition.getInstanceId());
-    }
-    if (condition.getStartTime() != null) {
-      LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getStartTime());
-      stmt.setLong(pos++, condition.getStartTime());
-    }
-    if (condition.getEndTime() != null) {
-      LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
-      stmt.setLong(pos, condition.getEndTime());
-    }
-    if (condition.getFetchSize() != null) {
-      stmt.setFetchSize(condition.getFetchSize());
-    }
-
-    return stmt;
-  }
-
-
-  public static PreparedStatement prepareGetAggregateSqlStmt(
-    Connection connection, Condition condition) throws SQLException {
-
-    if (condition.isEmpty()) {
-      throw new IllegalArgumentException("Condition is empty.");
-    }
-
-    StringBuilder sb = new StringBuilder(GET_CLUSTER_AGGREGATE_SQL);
-    sb.append(" WHERE ");
-    sb.append(condition.getConditionClause());
-    sb.append(" ORDER BY METRIC_NAME, SERVER_TIME");
-    if (condition.getLimit() != null) {
-      sb.append(" LIMIT ").append(condition.getLimit());
-    }
-
-    LOG.debug("SQL => " + sb.toString() + ", condition => " + condition);
-    PreparedStatement stmt = connection.prepareStatement(sb.toString());
-    int pos = 1;
-    if (condition.getMetricNames() != null) {
-      for (; pos <= condition.getMetricNames().size(); pos++) {
-        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
-      }
-    }
-    // TODO: Upper case all strings on POST
-    if (condition.getAppId() != null) {
-      stmt.setString(pos++, condition.getAppId().toLowerCase());
-    }
-    if (condition.getInstanceId() != null) {
-      stmt.setString(pos++, condition.getInstanceId());
-    }
-    if (condition.getStartTime() != null) {
-      stmt.setLong(pos++, condition.getStartTime());
-    }
-    if (condition.getEndTime() != null) {
-      stmt.setLong(pos, condition.getEndTime());
-    }
-
-    return stmt;
-  }
-
-  static class Condition {
-    List<String> metricNames;
-    String hostname;
-    String appId;
-    String instanceId;
-    Long startTime;
-    Long endTime;
-    Integer limit;
-    boolean grouped;
-    boolean noLimit = false;
-    Integer fetchSize;
-    String statement;
-    Set<String> orderByColumns = new LinkedHashSet<String>();
-
-    Condition(List<String> metricNames, String hostname, String appId,
-              String instanceId, Long startTime, Long endTime, Integer limit,
-              boolean grouped) {
-      this.metricNames = metricNames;
-      this.hostname = hostname;
-      this.appId = appId;
-      this.instanceId = instanceId;
-      this.startTime = startTime;
-      this.endTime = endTime;
-      this.limit = limit;
-      this.grouped = grouped;
-    }
-
-    String getStatement() {
-      return statement;
-    }
-
-    void setStatement(String statement) {
-      this.statement = statement;
-    }
-
-    List<String> getMetricNames() {
-      return metricNames == null || metricNames.isEmpty() ? null : metricNames;
-    }
-
-    String getMetricsClause() {
-      StringBuilder sb = new StringBuilder("(");
-      if (metricNames != null) {
-        for (String name : metricNames) {
-          if (sb.length() != 1) {
-            sb.append(", ");
-          }
-          sb.append("?");
-        }
-        sb.append(")");
-        return sb.toString();
-      } else {
-        return null;
-      }
-    }
-
-    String getConditionClause() {
-      StringBuilder sb = new StringBuilder();
-      boolean appendConjunction = false;
-
-      if (getMetricNames() != null) {
-        sb.append("METRIC_NAME IN ");
-        sb.append(getMetricsClause());
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getHostname() != null) {
-        sb.append(" HOSTNAME = ?");
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getAppId() != null) {
-        sb.append(" APP_ID = ?");
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getInstanceId() != null) {
-        sb.append(" INSTANCE_ID = ?");
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getStartTime() != null) {
-        sb.append(" SERVER_TIME >= ?");
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      if (getEndTime() != null) {
-        sb.append(" SERVER_TIME < ?");
-      }
-      return sb.toString();
-    }
-
-    String getHostname() {
-      return hostname == null || hostname.isEmpty() ? null : hostname;
-    }
-
-    String getAppId() {
-      return appId == null || appId.isEmpty() ? null : appId;
-    }
-
-    String getInstanceId() {
-      return instanceId == null || instanceId.isEmpty() ? null : instanceId;
-    }
-
-    /**
-     * Convert to millis.
-     */
-    Long getStartTime() {
-      if (startTime < 9999999999l) {
-        return startTime * 1000;
-      } else {
-        return startTime;
-      }
-    }
-
-    Long getEndTime() {
-      if (endTime < 9999999999l) {
-        return endTime * 1000;
-      } else {
-        return endTime;
-      }
-    }
-
-    void setNoLimit() {
-      this.noLimit = true;
-    }
-
-    Integer getLimit() {
-      if (noLimit) {
-        return null;
-      }
-      return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
-    }
-
-    boolean isGrouped() {
-      return grouped;
-    }
-
-    boolean isEmpty() {
-      return (metricNames == null || metricNames.isEmpty())
-        && (hostname == null || hostname.isEmpty())
-        && (appId == null || appId.isEmpty())
-        && (instanceId == null || instanceId.isEmpty())
-        && startTime == null
-        && endTime == null;
-    }
-
-    Integer getFetchSize() {
-      return fetchSize;
-    }
-
-    void setFetchSize(Integer fetchSize) {
-      this.fetchSize = fetchSize;
-    }
-
-    void addOrderByColumn(String column) {
-      orderByColumns.add(column);
-    }
-
-    String getOrderByClause() {
-      String orderByStr = " ORDER BY ";
-      if (!orderByColumns.isEmpty()) {
-        StringBuilder sb = new StringBuilder(orderByStr);
-        for (String orderByColumn : orderByColumns) {
-          if (sb.length() != orderByStr.length()) {
-            sb.append(", ");
-          }
-          sb.append(orderByColumn);
-        }
-        sb.append(" ");
-        return sb.toString();
-      }
-      return null;
-    }
-
-    @Override
-    public String toString() {
-      return "Condition{" +
-        "metricNames=" + metricNames +
-        ", hostname='" + hostname + '\'' +
-        ", appId='" + appId + '\'' +
-        ", instanceId='" + instanceId + '\'' +
-        ", startTime=" + startTime +
-        ", endTime=" + endTime +
-        ", limit=" + limit +
-        ", grouped=" + grouped +
-        ", orderBy=" + orderByColumns +
-        ", noLimit=" + noLimit +
-        '}';
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
deleted file mode 100644
index d227993..0000000
--- 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-public class TimelineClusterMetric {
-  private String metricName;
-  private String appId;
-  private String instanceId;
-  private long timestamp;
-  private String type;
-
-  TimelineClusterMetric(String metricName, String appId, String instanceId,
-                        long timestamp, String type) {
-    this.metricName = metricName;
-    this.appId = appId;
-    this.instanceId = instanceId;
-    this.timestamp = timestamp;
-    this.type = type;
-  }
-
-  String getMetricName() {
-    return metricName;
-  }
-
-  String getAppId() {
-    return appId;
-  }
-
-  String getInstanceId() {
-    return instanceId;
-  }
-
-  long getTimestamp() {
-    return timestamp;
-  }
-
-  String getType() { return type; }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    TimelineClusterMetric that = (TimelineClusterMetric) o;
-
-    if (timestamp != that.timestamp) return false;
-    if (appId != null ? !appId.equals(that.appId) : that.appId != null)
-      return false;
-    if (instanceId != null ? !instanceId.equals(that.instanceId) : 
that.instanceId != null)
-      return false;
-    if (!metricName.equals(that.metricName)) return false;
-
-    return true;
-  }
-
-  public boolean equalsExceptTime(TimelineClusterMetric metric) {
-    if (!metricName.equals(metric.metricName)) return false;
-    if (!appId.equals(metric.appId)) return false;
-    if (instanceId != null ? !instanceId.equals(metric.instanceId) : 
metric.instanceId != null)
-      return false;
-
-    return true;
-  }
-  @Override
-  public int hashCode() {
-    int result = metricName.hashCode();
-    result = 31 * result + (appId != null ? appId.hashCode() : 0);
-    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
-    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    return "TimelineClusterMetric{" +
-      "metricName='" + metricName + '\'' +
-      ", appId='" + appId + '\'' +
-      ", instanceId='" + instanceId + '\'' +
-      ", timestamp=" + timestamp +
-      '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
deleted file mode 100644
index cab154b..0000000
--- 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-
-public class TimelineMetricAggregator extends AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog
-    (TimelineMetricAggregator.class);
-
-  private final String checkpointLocation;
-  private final Long sleepIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-  private final String hostAggregatorDisabledParam;
-  private final String tableName;
-  private final String outputTableName;
-  private final Long nativeTimeRangeDelay;
-
-  public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
-                                  Configuration metricsConf,
-                                  String checkpointLocation,
-                                  Long sleepIntervalMillis,
-                                  Integer checkpointCutOffMultiplier,
-                                  String hostAggregatorDisabledParam,
-                                  String tableName,
-                                  String outputTableName,
-                                  Long nativeTimeRangeDelay) {
-    super(hBaseAccessor, metricsConf);
-    this.checkpointLocation = checkpointLocation;
-    this.sleepIntervalMillis = sleepIntervalMillis;
-    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
-    this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
-    this.tableName = tableName;
-    this.outputTableName = outputTableName;
-    this.nativeTimeRangeDelay =  nativeTimeRangeDelay;
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime)
-    throws IOException, SQLException {
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-      aggregateMetricsFromResultSet(rs);
-
-    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
-      outputTableName);
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-    Condition condition = new Condition(null, null, null, null, startTime,
-      endTime, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
nativeTimeRangeDelay),
-      tableName));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("HOSTNAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private Map<TimelineMetric, MetricHostAggregate> 
aggregateMetricsFromResultSet
-      (ResultSet rs) throws IOException, SQLException {
-    TimelineMetric existingMetric = null;
-    MetricHostAggregate hostAggregate = null;
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-      new HashMap<TimelineMetric, MetricHostAggregate>();
-
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
-      MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
-      if (existingMetric == null) {
-        // First row
-        existingMetric = currentMetric;
-        hostAggregate = new MetricHostAggregate();
-        hostAggregateMap.put(currentMetric, hostAggregate);
-      }
-
-      if (existingMetric.equalsExceptTime(currentMetric)) {
-        // Recalculate totals with current metric
-        hostAggregate.updateAggregates(currentHostAggregate);
-      } else {
-        // Switched over to a new metric - save existing - create new aggregate
-        hostAggregate = new MetricHostAggregate();
-        hostAggregate.updateAggregates(currentHostAggregate);
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        existingMetric = currentMetric;
-      }
-    }
-    return hostAggregateMap;
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected boolean isDisabled() {
-    return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
deleted file mode 100644
index 8b10079..0000000
--- 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.conf.Configuration;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-/**
- *
- */
-public class TimelineMetricAggregatorFactory {
-  private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-checkpoint";
-  private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-hourly-checkpoint";
-
-  public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      MINUTE_AGGREGATE_CHECKPOINT_FILE);
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
-    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
-
-    String inputTableName = METRICS_RECORD_TABLE_NAME;
-    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-
-    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      120000l);
-  }
-
-  public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-    String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
-
-    String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-    String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-
-    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      3600000l);
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
deleted file mode 100644
index 654c188..0000000
--- 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-/**
- * Aggregates a metric across all hosts in the cluster. Reads metrics from
- * the precision table and saves into the aggregate.
- */
-public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator {
-  private static final Log LOG = 
LogFactory.getLog(TimelineMetricClusterAggregator.class);
-  private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
-    "timeline-metrics-cluster-aggregator-checkpoint";
-  private final String checkpointLocation;
-  private final Long sleepIntervalMillis;
-  public final int timeSliceIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-
-  public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
-                                         Configuration metricsConf) {
-    super(hBaseAccessor, metricsConf);
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    checkpointLocation = FilenameUtils.concat(checkpointDir,
-      CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
-
-    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
-    timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt
-      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
-    checkpointCutOffMultiplier =
-      
metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime)
-    throws SQLException, IOException {
-    List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
-    Map<TimelineClusterMetric, MetricClusterAggregate>
-      aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
-
-    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric 
aggregates.");
-    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-    Condition condition = new Condition(null, null, null, null, startTime,
-      endTime, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_RECORD_TABLE_NAME));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private List<Long[]> getTimeSlices(long startTime, long endTime) {
-    List<Long[]> timeSlices = new ArrayList<Long[]>();
-    long sliceStartTime = startTime;
-    while (sliceStartTime < endTime) {
-      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + 
timeSliceIntervalMillis});
-      sliceStartTime += timeSliceIntervalMillis;
-    }
-    return timeSlices;
-  }
-
-  private Map<TimelineClusterMetric, MetricClusterAggregate>
-  aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
-    throws SQLException, IOException {
-    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics 
=
-      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-    // Create time slices
-
-    while (rs.next()) {
-      TimelineMetric metric =
-        PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
-
-      Map<TimelineClusterMetric, Double> clusterMetrics =
-        sliceFromTimelineMetric(metric, timeSlices);
-
-      if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
-        for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
-            clusterMetrics.entrySet()) {
-          TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
-          MetricClusterAggregate aggregate = 
aggregateClusterMetrics.get(clusterMetric);
-          Double avgValue = clusterMetricEntry.getValue();
-
-          if (aggregate == null) {
-            aggregate = new MetricClusterAggregate(avgValue, 1, null,
-              avgValue, avgValue);
-            aggregateClusterMetrics.put(clusterMetric, aggregate);
-          } else {
-            aggregate.updateSum(avgValue);
-            aggregate.updateNumberOfHosts(1);
-            aggregate.updateMax(avgValue);
-            aggregate.updateMin(avgValue);
-          }
-        }
-      }
-    }
-    return aggregateClusterMetrics;
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected boolean isDisabled() {
-    return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false);
-  }
-
-  private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
-        TimelineMetric timelineMetric, List<Long[]> timeSlices) {
-
-    if (timelineMetric.getMetricValues().isEmpty()) {
-      return null;
-    }
-
-    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
-      new HashMap<TimelineClusterMetric, Double>();
-
-    for (Map.Entry<Long, Double> metric : 
timelineMetric.getMetricValues().entrySet()) {
-      // TODO: investigate null values - pre filter
-      if (metric.getValue() == null) {
-        continue;
-      }
-      Long timestamp = getSliceTimeForMetric(timeSlices,
-                       Long.parseLong(metric.getKey().toString()));
-      if (timestamp != -1) {
-        // Metric is within desired time range
-        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
-          timelineMetric.getMetricName(),
-          timelineMetric.getAppId(),
-          timelineMetric.getInstanceId(),
-          timestamp,
-          timelineMetric.getType());
-        if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
-          timelineClusterMetricMap.put(clusterMetric, metric.getValue());
-        } else {
-          Double oldValue = timelineClusterMetricMap.get(clusterMetric);
-          Double newValue = (oldValue + metric.getValue()) / 2;
-          timelineClusterMetricMap.put(clusterMetric, newValue);
-        }
-      }
-    }
-
-    return timelineClusterMetricMap;
-  }
-
-  /**
-   * Return beginning of the time slice into which the metric fits.
-   */
-  private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
-    for (Long[] timeSlice : timeSlices) {
-      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
-        return timeSlice[0];
-      }
-    }
-    return -1l;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
 
b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
deleted file mode 100644
index 7764ea3..0000000
--- 
a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricClusterAggregatorHourly extends
-  AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog
-    (TimelineMetricClusterAggregatorHourly.class);
-  private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
-    "timeline-metrics-cluster-aggregator-hourly-checkpoint";
-  private final String checkpointLocation;
-  private final long sleepIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-  private long checkpointCutOffIntervalMillis;
-  private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour
-
-  public TimelineMetricClusterAggregatorHourly(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
-    super(hBaseAccessor, metricsConf);
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    checkpointLocation = FilenameUtils.concat(checkpointDir,
-      CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
-
-    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-    checkpointCutOffIntervalMillis =  SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
-    checkpointCutOffMultiplier = metricsConf.getInt
-      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime)
-    throws SQLException, IOException {
-      Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
-        aggregateMetricsFromResultSet(rs);
-
-    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-    hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
-      METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime,
-                                                  long endTime) {
-    Condition condition = new Condition(null, null, null, null, startTime,
-      endTime, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA)));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private Map<TimelineClusterMetric, MetricHostAggregate>
-  aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException 
{
-
-    TimelineClusterMetric existingMetric = null;
-    MetricHostAggregate hostAggregate = null;
-    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
-      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
-
-    while (rs.next()) {
-      TimelineClusterMetric currentMetric =
-        getTimelineMetricClusterKeyFromResultSet(rs);
-      MetricClusterAggregate currentHostAggregate =
-        getMetricClusterAggregateFromResultSet(rs);
-
-      if (existingMetric == null) {
-        // First row
-        existingMetric = currentMetric;
-        hostAggregate = new MetricHostAggregate();
-        hostAggregateMap.put(currentMetric, hostAggregate);
-      }
-
-      if (existingMetric.equalsExceptTime(currentMetric)) {
-        // Recalculate totals with current metric
-        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-
-      } else {
-        // Switched over to a new metric - save existing
-        hostAggregate = new MetricHostAggregate();
-        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        existingMetric = currentMetric;
-      }
-
-    }
-
-    return hostAggregateMap;
-  }
-
-  private void updateAggregatesFromHost(
-    MetricHostAggregate agg,
-    MetricClusterAggregate currentClusterAggregate) {
-    agg.updateMax(currentClusterAggregate.getMax());
-    agg.updateMin(currentClusterAggregate.getMin());
-    agg.updateSum(currentClusterAggregate.getSum());
-    agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected Long getCheckpointCutOffIntervalMillis() {
-    return checkpointCutOffIntervalMillis;
-  }
-
-  @Override
-  protected boolean isDisabled() {
-    return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false);
-  }
-
-
-}

Reply via email to