http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/RestMetricsSender.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/RestMetricsSender.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/RestMetricsSender.java
new file mode 100644
index 0000000..5130ae3
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/RestMetricsSender.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.net;
+
+import com.google.common.base.Stopwatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.ProtocolException;
+
+/**
+ * Implements MetricsSender and provides a way of pushing metrics to 
application metrics history service using REST
+ * endpoint.
+ */
+public class RestMetricsSender implements MetricsSender {
+  private final static Logger LOG = 
LoggerFactory.getLogger(RestMetricsSender.class);
+
+  private final static String COLLECTOR_URL = 
"http://%s:8188/ws/v1/timeline/metrics";;
+  private final String collectorServiceAddress;
+
+  /**
+   * Creates unconnected RestMetricsSender with endpoint configured as
+   * http://${metricsHost}:8188/ws/v1/timeline/metrics,
+   * where ${metricsHost} is specified by metricHost param.
+   *
+   * @param metricsHost the hostname that will be used to access application 
metrics history service.
+   */
+  public RestMetricsSender(String metricsHost) {
+    collectorServiceAddress = String.format(COLLECTOR_URL, metricsHost);
+  }
+
+  /**
+   * Push metrics to the REST endpoint. Connection is always open and closed 
on every call.
+   *
+   * @param payload the payload with metrics to be sent to metrics service
+   * @return response message either acknowledgement or error, empty on 
exception
+   */
+  @Override
+  public String pushMetrics(String payload) {
+    String responseString = "";
+    UrlService svc = null;
+    Stopwatch timer = new Stopwatch().start();
+
+    try {
+      LOG.info("server: {}", collectorServiceAddress);
+
+      svc = getConnectedUrlService();
+      responseString = svc.send(payload);
+
+      timer.stop();
+      LOG.info("http response time: " + timer.elapsedMillis() + " ms");
+
+      if (responseString.length() > 0) {
+        LOG.debug("POST response from server: " + responseString);
+      }
+    } catch (MalformedURLException e) {
+      LOG.error("", e);
+    } catch (ProtocolException e) {
+      LOG.error("", e);
+    } catch (IOException e) {
+      LOG.error("", e);
+    } finally {
+      if (svc != null) {
+        svc.disconnect();
+      }
+    }
+
+    return responseString;
+  }
+
+  /**
+   * Relaxed to protected for testing.
+   */
+  protected UrlService getConnectedUrlService() throws IOException {
+    return UrlService.newConnection(collectorServiceAddress);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/StdOutMetricsSender.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/StdOutMetricsSender.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/StdOutMetricsSender.java
new file mode 100644
index 0000000..aeb4ca8
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/StdOutMetricsSender.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.net;
+
+import java.io.PrintStream;
+
+/**
+ * StdOutMetricsSender dumps metrics to defined PrintStream out. It is useful 
for testing.
+ */
+public class StdOutMetricsSender implements MetricsSender {
+  public final PrintStream out;
+  private String metricsHostName;
+
+  /**
+   * Creates new StdOutMetricsSender with specified hostname (only used in 
messages) and sends output to System.out
+   *
+   * @param metricsHostName a name used in printed messages
+   */
+  public StdOutMetricsSender(String metricsHostName) {
+    this(metricsHostName, System.out);
+  }
+
+  /**
+   * Creates new StdOutMetricsSender with specified hostname (only used in 
messages) and PrintStream which is used as
+   * an output.
+   *
+   * @param metricsHostName a name used in printed messages
+   * @param out             PrintStream that the Sender will write to, can be 
System.out
+   */
+  public StdOutMetricsSender(String metricsHostName, PrintStream out) {
+    this.metricsHostName = metricsHostName;
+    this.out = out;
+  }
+
+  @Override
+  public String pushMetrics(String payload) {
+    out.println("Sending to " + metricsHostName + ": " + payload);
+
+    return "OK";
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/UrlService.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/UrlService.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/UrlService.java
new file mode 100644
index 0000000..7402438
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/UrlService.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.net;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class UrlService {
+
+  public static final int CONNECT_TIMEOUT = 20000;
+  public static final int READ_TIMEOUT = 20000;
+  private final String address;
+  private HttpURLConnection conn;
+
+  private UrlService(String address) {
+    this.address = address;
+  }
+
+  /**
+   * Returns a new UrlService connected to specified address.
+   *
+   * @param address
+   * @return
+   * @throws IOException
+   */
+  public static UrlService newConnection(String address) throws IOException {
+    UrlService svc = new UrlService(address);
+    svc.connect();
+
+    return svc;
+  }
+
+  public HttpURLConnection connect() throws IOException {
+    URL url = new URL(address);
+    conn = (HttpURLConnection) url.openConnection();
+
+    //TODO: make timeouts configurable
+    conn.setConnectTimeout(CONNECT_TIMEOUT);
+    conn.setReadTimeout(READ_TIMEOUT);
+    conn.setDoInput(true);
+    conn.setDoOutput(true);
+    conn.setRequestMethod("POST");
+    conn.setRequestProperty("Content-Type", "application/json");
+    conn.setRequestProperty("Accept", "*/*");
+
+    return conn;
+  }
+
+  public String send(String payload) throws IOException {
+    if (conn == null)
+      throw new IllegalStateException("Cannot use unconnected UrlService");
+    write(payload);
+
+    return read();
+  }
+
+  private String read() throws IOException {
+    StringBuilder response = new StringBuilder();
+
+    BufferedReader br = new BufferedReader(new InputStreamReader(
+      conn.getInputStream()));
+    String line = null;
+    while ((line = br.readLine()) != null) {
+      response.append(line);
+    }
+    br.close();
+
+    return response.toString();
+  }
+
+  private void write(String payload) throws IOException {
+    OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream(),
+      "UTF-8");
+    writer.write(payload);
+    writer.close();
+  }
+
+  public void disconnect() {
+    conn.disconnect();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/Json.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/Json.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/Json.java
new file mode 100644
index 0000000..61a3903
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/Json.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.util;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonMethod;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+import java.io.IOException;
+
+/**
+ * Small wrapper that configures the ObjectMapper with some defaults.
+ */
+public class Json {
+  private ObjectMapper myObjectMapper;
+
+  /**
+   * Creates default Json ObjectMapper that maps fields.
+   */
+  public Json() {
+    this(false);
+  }
+
+  /**
+   * Creates a Json ObjectMapper that maps fields and optionally pretty prints 
the
+   * serialized objects.
+   *
+   * @param pretty a flag - if true the output will be pretty printed.
+   */
+  public Json(boolean pretty) {
+    myObjectMapper = new ObjectMapper();
+    myObjectMapper.setVisibility(JsonMethod.FIELD, 
JsonAutoDetect.Visibility.ANY);
+    if (pretty) {
+      myObjectMapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, 
true);
+    }
+  }
+
+  public String serialize(Object o) throws IOException {
+    return myObjectMapper.writeValueAsString(o);
+  }
+
+  public <T> T deserialize(String content, Class<T> paramClass) throws 
IOException {
+    return myObjectMapper.readValue(content, paramClass);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/RandomMetricsProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/RandomMetricsProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/RandomMetricsProvider.java
new file mode 100644
index 0000000..7910711
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/RandomMetricsProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.util;
+
+import java.util.Random;
+
+/**
+ */
+public class RandomMetricsProvider {
+
+  private double min;
+  private double max;
+  private Random rnd;
+
+  public RandomMetricsProvider(double min, double max) {
+    this.min = min;
+    this.max = max;
+    this.rnd = new Random();
+  }
+
+  public double next() {
+    return rnd.nextDouble() * (max - min) + min;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TimeStampProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TimeStampProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TimeStampProvider.java
new file mode 100644
index 0000000..ad7ec86
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TimeStampProvider.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .loadsimulator.util;
+
+/**
+ */
+public class TimeStampProvider {
+  private int timeStep;
+  private long currentTime;
+  private int sendInterval;
+
+  public TimeStampProvider(long startTime, int timeStep, int sendInterval) {
+    this.timeStep = timeStep;
+    this.currentTime = startTime - timeStep;
+    this.sendInterval = sendInterval;
+  }
+
+  public long next() {
+    return currentTime += timeStep;
+  }
+
+  public long[] timestampsForNextInterval() {
+    return timestampsForInterval(sendInterval);
+  }
+
+  private long[] timestampsForInterval(int sendInterval) {
+    int steps = sendInterval / timeStep;
+    long[] timestamps = new long[steps];
+
+    for (int i = 0; i < timestamps.length; i++) {
+      timestamps[i] = next();
+    }
+
+    return timestamps;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
new file mode 100644
index 0000000..a123e57
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+
+public abstract class AbstractTimelineAggregator implements Runnable {
+  protected final PhoenixHBaseAccessor hBaseAccessor;
+  private final Log LOG;
+  protected final long checkpointDelayMillis;
+  protected final Integer resultsetFetchSize;
+  protected Configuration metricsConf;
+
+  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                    Configuration metricsConf) {
+    this.hBaseAccessor = hBaseAccessor;
+    this.metricsConf = metricsConf;
+    this.checkpointDelayMillis = SECONDS.toMillis(
+      metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
+    this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
+    this.LOG = LogFactory.getLog(this.getClass());
+  }
+
+  @Override
+  public void run() {
+    LOG.info("Started Timeline aggregator thread @ " + new Date());
+    Long SLEEP_INTERVAL = getSleepIntervalMillis();
+
+    while (true) {
+      long currentTime = System.currentTimeMillis();
+      long lastCheckPointTime = -1;
+
+      try {
+        lastCheckPointTime = readCheckPoint();
+        if (isLastCheckPointTooOld(lastCheckPointTime)) {
+          LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
+            "lastCheckPointTime = " + lastCheckPointTime);
+          lastCheckPointTime = -1;
+        }
+        if (lastCheckPointTime == -1) {
+          // Assuming first run, save checkpoint and sleep.
+          // Set checkpoint to 2 minutes in the past to allow the
+          // agents/collectors to catch up
+          saveCheckPoint(currentTime - checkpointDelayMillis);
+        }
+      } catch (IOException io) {
+        LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
+      }
+      long sleepTime = SLEEP_INTERVAL;
+
+      if (lastCheckPointTime != -1) {
+        LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+          + ((System.currentTimeMillis() - lastCheckPointTime) / 1000)
+          + " seconds.");
+
+        long startTime = System.currentTimeMillis();
+        boolean success = doWork(lastCheckPointTime,
+          lastCheckPointTime + SLEEP_INTERVAL);
+        long executionTime = System.currentTimeMillis() - startTime;
+        long delta = SLEEP_INTERVAL - executionTime;
+
+        if (delta > 0) {
+          // Sleep for (configured sleep - time to execute task)
+          sleepTime = delta;
+        } else {
+          // No sleep because last run took too long to execute
+          LOG.info("Aggregator execution took too long, " +
+            "cancelling sleep. executionTime = " + executionTime);
+          sleepTime = 1;
+        }
+
+        LOG.debug("Aggregator sleep interval = " + sleepTime);
+
+        if (success) {
+          try {
+            saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+          } catch (IOException io) {
+            LOG.warn("Error saving checkpoint, restarting aggregation at " +
+              "previous checkpoint.");
+          }
+        }
+      }
+
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        LOG.info("Sleep interrupted, continuing with aggregation.");
+      }
+    }
+  }
+
+  private boolean isLastCheckPointTooOld(long checkpoint) {
+    return checkpoint != -1 &&
+      ((System.currentTimeMillis() - checkpoint) >
+        getCheckpointCutOffIntervalMillis());
+  }
+
+  private long readCheckPoint() {
+    try {
+      File checkpoint = new File(getCheckpointLocation());
+      if (checkpoint.exists()) {
+        String contents = FileUtils.readFileToString(checkpoint);
+        if (contents != null && !contents.isEmpty()) {
+          return Long.parseLong(contents);
+        }
+      }
+    } catch (IOException io) {
+      LOG.debug(io);
+    }
+    return -1;
+  }
+
+  private void saveCheckPoint(long checkpointTime) throws IOException {
+    File checkpoint = new File(getCheckpointLocation());
+    if (!checkpoint.exists()) {
+      boolean done = checkpoint.createNewFile();
+      if (!done) {
+        throw new IOException("Could not create checkpoint at location, " +
+          getCheckpointLocation());
+      }
+    }
+    FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
+  }
+
+  /**
+   * Read metrics written during the time interval and save the sum and total
+   * in the aggregate table.
+   *
+   * @param startTime Sample start time
+   * @param endTime Sample end time
+   */
+  protected boolean doWork(long startTime, long endTime) {
+    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+      "startTime = " + new Date(startTime) + ", endTime = " + new 
Date(endTime));
+
+    boolean success = true;
+    PhoenixTransactSQL.Condition condition =
+      prepareMetricQueryCondition(startTime, endTime);
+
+    Connection conn = null;
+    PreparedStatement stmt = null;
+
+    try {
+      conn = hBaseAccessor.getConnection();
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+      LOG.debug("Query issued @: " + new Date());
+      ResultSet rs = stmt.executeQuery();
+      LOG.debug("Query returned @: " + new Date());
+
+      aggregate(rs, startTime, endTime);
+      LOG.info("End aggregation cycle @ " + new Date());
+
+    } catch (SQLException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } catch (IOException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    LOG.info("End aggregation cycle @ " + new Date());
+    return success;
+  }
+
+  protected abstract PhoenixTransactSQL.Condition
+  prepareMetricQueryCondition(long startTime, long endTime);
+
+  protected abstract void aggregate(ResultSet rs, long startTime, long endTime)
+    throws IOException, SQLException;
+
+  protected abstract Long getSleepIntervalMillis();
+
+  protected abstract Integer getCheckpointCutOffMultiplier();
+
+  protected Long getCheckpointCutOffIntervalMillis() {
+    return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
+  }
+
+  protected abstract boolean isDisabled();
+
+  protected abstract String getCheckpointLocation();
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java
new file mode 100644
index 0000000..f514298
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class Aggregator {
+
+  public double[] calculateAggregates(Map<Long, Double> metricValues) {
+    double[] values = new double[4];
+    double max = Double.MIN_VALUE;
+    double min = Double.MAX_VALUE;
+    double sum = 0.0;
+    int metricCount = 0;
+
+    if (metricValues != null && !metricValues.isEmpty()) {
+      for (Double value : metricValues.values()) {
+        // TODO: Some nulls in data - need to investigate null values from host
+        if (value != null) {
+          if (value > max) {
+            max = value;
+          }
+          if (value < min) {
+            min = value;
+          }
+          sum += value;
+        }
+      }
+      metricCount = metricValues.values().size();
+    }
+    // BR: WHY ZERO is a good idea?
+    values[0] = sum;
+    values[1] = max != Double.MIN_VALUE ? max : 0.0;
+    values[2] = min != Double.MAX_VALUE ? min : 0.0;
+    values[3] = metricCount;
+
+    return values;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
new file mode 100644
index 0000000..47435f4
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ *
+ */
+public interface ConnectionProvider {
+  public Connection getConnection() throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
new file mode 100644
index 0000000..652c492
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class DefaultPhoenixDataSource implements ConnectionProvider {
+
+  static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
+  private static final String ZOOKEEPER_CLIENT_PORT =
+    "hbase.zookeeper.property.clientPort";
+  private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+  private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
+  private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+  private final String url;
+
+  public DefaultPhoenixDataSource(Configuration hbaseConf) {
+    String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT,
+      "2181");
+    String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
+    String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase");
+    if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+      throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+        "access HBase store using Phoenix.");
+    }
+
+    url = String.format(connectionUrl,
+      zookeeperQuorum,
+      zookeeperClientPort,
+      znodeParent);
+  }
+
+  /**
+   * Get JDBC connection to HBase store. Assumption is that the hbase
+   * configuration is present on the classpath and loaded by the caller into
+   * the Configuration object.
+   * Phoenix already caches the HConnection between the client and HBase
+   * cluster.
+   *
+   * @return @java.sql.Connection
+   */
+  public Connection getConnection() throws SQLException {
+
+    LOG.debug("Metric store connection url: " + url);
+    try {
+      return DriverManager.getConnection(url);
+    } catch (SQLException e) {
+      LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+
+      throw e;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
new file mode 100644
index 0000000..9364187
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import java.io.IOException;
+import java.net.URL;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
+
+public class HBaseTimelineMetricStore extends AbstractService
+    implements TimelineMetricStore {
+
+  static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class);
+  private PhoenixHBaseAccessor hBaseAccessor;
+
+  /**
+   * Construct the service.
+   *
+   */
+  public HBaseTimelineMetricStore() {
+    super(HBaseTimelineMetricStore.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      classLoader = getClass().getClassLoader();
+    }
+    URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE);
+    URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+    LOG.info("Found hbase site configuration: " + hbaseResUrl);
+    LOG.info("Found metric service configuration: " + amsResUrl);
+
+    if (hbaseResUrl == null) {
+      throw new IllegalStateException("Unable to initialize the metrics " +
+        "subsystem. No hbase-site present in the classpath.");
+    }
+
+    if (amsResUrl == null) {
+      throw new IllegalStateException("Unable to initialize the metrics " +
+        "subsystem. No ams-site present in the classpath.");
+    }
+
+    Configuration hbaseConf = new Configuration(true);
+    hbaseConf.addResource(hbaseResUrl.toURI().toURL());
+    Configuration metricsConf = new Configuration(true);
+    metricsConf.addResource(amsResUrl.toURI().toURL());
+
+    initializeSubsystem(hbaseConf, metricsConf);
+  }
+
+  private void initializeSubsystem(Configuration hbaseConf,
+                                   Configuration metricsConf) {
+    hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
+    hBaseAccessor.initMetricSchema();
+
+    // Start the cluster aggregator
+    TimelineMetricClusterAggregator minuteClusterAggregator =
+      new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf);
+    if (!minuteClusterAggregator.isDisabled()) {
+      Thread aggregatorThread = new Thread(minuteClusterAggregator);
+      aggregatorThread.start();
+    }
+
+    // Start the cluster aggregator hourly
+    TimelineMetricClusterAggregatorHourly hourlyClusterAggregator =
+      new TimelineMetricClusterAggregatorHourly(hBaseAccessor, metricsConf);
+    if (!hourlyClusterAggregator.isDisabled()) {
+      Thread aggregatorThread = new Thread(hourlyClusterAggregator);
+      aggregatorThread.start();
+    }
+
+    // Start the 5 minute aggregator
+    TimelineMetricAggregator minuteHostAggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute
+        (hBaseAccessor, metricsConf);
+    if (!minuteHostAggregator.isDisabled()) {
+      Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
+      minuteAggregatorThread.start();
+    }
+
+    // Start hourly host aggregator
+    TimelineMetricAggregator hourlyHostAggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly
+        (hBaseAccessor, metricsConf);
+    if (!hourlyHostAggregator.isDisabled()) {
+      Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
+      aggregatorHourlyThread.start();
+    }
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  //TODO: update to work with HOSTS_COUNT and METRIC_COUNT
+  @Override
+  public TimelineMetrics getTimelineMetrics(List<String> metricNames,
+      String hostname, String applicationId, String instanceId,
+      Long startTime, Long endTime, Integer limit,
+      boolean groupedByHosts) throws SQLException, IOException {
+
+    Condition condition = new Condition(metricNames, hostname, applicationId,
+      instanceId, startTime, endTime, limit, groupedByHosts);
+
+    if (hostname == null) {
+      return hBaseAccessor.getAggregateMetricRecords(condition);
+    }
+
+    return hBaseAccessor.getMetricRecords(condition);
+  }
+
+  @Override
+  public TimelineMetric getTimelineMetric(String metricName, String hostname,
+      String applicationId, String instanceId, Long startTime,
+      Long endTime, Integer limit)
+      throws SQLException, IOException {
+
+    TimelineMetrics metrics = hBaseAccessor.getMetricRecords(
+      new Condition(Collections.singletonList(metricName), hostname,
+        applicationId, instanceId, startTime, endTime, limit, true)
+    );
+
+    TimelineMetric metric = new TimelineMetric();
+    List<TimelineMetric> metricList = metrics.getMetrics();
+
+    if (metricList != null && !metricList.isEmpty()) {
+      metric.setMetricName(metricList.get(0).getMetricName());
+      metric.setAppId(metricList.get(0).getAppId());
+      metric.setInstanceId(metricList.get(0).getInstanceId());
+      metric.setHostName(metricList.get(0).getHostName());
+      // Assumption that metrics are ordered by start time
+      metric.setStartTime(metricList.get(0).getStartTime());
+      Map<Long, Double> metricRecords = new HashMap<Long, Double>();
+      for (TimelineMetric timelineMetric : metricList) {
+        metricRecords.putAll(timelineMetric.getMetricValues());
+      }
+      metric.setMetricValues(metricRecords);
+    }
+
+    return metric;
+  }
+
+
+  @Override
+  public TimelinePutResponse putMetrics(TimelineMetrics metrics)
+    throws SQLException, IOException {
+
+    // Error indicated by the Sql exception
+    TimelinePutResponse response = new TimelinePutResponse();
+
+    hBaseAccessor.insertMetricRecords(metrics);
+
+    return response;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
new file mode 100644
index 0000000..61e15d7
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+  @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  protected Double sum = 0.0;
+  protected Double deviation;
+  protected Double max = Double.MIN_VALUE;
+  protected Double min = Double.MAX_VALUE;
+
+  public MetricAggregate() {
+  }
+
+  MetricAggregate(Double sum, Double deviation, Double max,
+                  Double min) {
+    this.sum = sum;
+    this.deviation = deviation;
+    this.max = max;
+    this.min = min;
+  }
+
+  void updateSum(Double sum) {
+    this.sum += sum;
+  }
+
+  void updateMax(Double max) {
+    if (max > this.max) {
+      this.max = max;
+    }
+  }
+
+  void updateMin(Double min) {
+    if (min < this.min) {
+      this.min = min;
+    }
+  }
+
+  @JsonProperty("sum")
+  Double getSum() {
+    return sum;
+  }
+
+  @JsonProperty("deviation")
+  Double getDeviation() {
+    return deviation;
+  }
+
+  @JsonProperty("max")
+  Double getMax() {
+    return max;
+  }
+
+  @JsonProperty("min")
+  Double getMin() {
+    return min;
+  }
+
+  public void setSum(Double sum) {
+    this.sum = sum;
+  }
+
+  public void setDeviation(Double deviation) {
+    this.deviation = deviation;
+  }
+
+  public void setMax(Double max) {
+    this.max = max;
+  }
+
+  public void setMin(Double min) {
+    this.min = min;
+  }
+
+  public String toJSON() throws IOException {
+    return mapper.writeValueAsString(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..c13c85f
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+  private int numberOfHosts;
+
+  @JsonCreator
+  public MetricClusterAggregate() {
+  }
+
+  MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+                         Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  @JsonProperty("numberOfHosts")
+  int getNumberOfHosts() {
+    return numberOfHosts;
+  }
+
+  void updateNumberOfHosts(int count) {
+    this.numberOfHosts += count;
+  }
+
+  public void setNumberOfHosts(int numberOfHosts) {
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  void updateAggregates(MetricClusterAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+  }
+
+  @Override
+  public String toString() {
+//    MetricClusterAggregate
+    return "MetricAggregate{" +
+      "sum=" + sum +
+      ", numberOfHosts=" + numberOfHosts +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..02cc207
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+  private long numberOfSamples = 0;
+
+  @JsonCreator
+  public MetricHostAggregate() {
+    super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+  }
+
+  public MetricHostAggregate(Double sum, int numberOfSamples,
+                             Double deviation,
+                             Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  @JsonProperty("numberOfSamples")
+  long getNumberOfSamples() {
+    return numberOfSamples == 0 ? 1 : numberOfSamples;
+  }
+
+  void updateNumberOfSamples(long count) {
+    this.numberOfSamples += count;
+  }
+
+  public void setNumberOfSamples(long numberOfSamples) {
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  public double getAvg() {
+    return sum / numberOfSamples;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  void updateAggregates(MetricHostAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+  }
+
+  @Override
+  public String toString() {
+    return "MetricHostAggregate{" +
+      "sum=" + sum +
+      ", numberOfSamples=" + numberOfSamples +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
new file mode 100644
index 0000000..88a427a
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+/**
+ * RuntimeException for initialization of metrics schema. It is 
RuntimeException
+ * since this is a not recoverable situation, and should be handled by main or
+ * service method followed by shutdown.
+ */
+public class MetricsInitializationException extends RuntimeException {
+  public MetricsInitializationException() {
+  }
+
+  public MetricsInitializationException(String msg) {
+    super(msg);
+  }
+
+  public MetricsInitializationException(Throwable t) {
+    super(t);
+  }
+
+  public MetricsInitializationException(String msg, Throwable t) {
+    super(msg, t);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
new file mode 100644
index 0000000..4f248b7
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
+
+/**
+ * Provides a facade over the Phoenix API to access HBase schema
+ */
+public class PhoenixHBaseAccessor {
+
+  private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
+  private final Configuration hbaseConf;
+  private final Configuration metricsConf;
+  private final RetryCounterFactory retryCounterFactory;
+
+  static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
+  /**
+   * 4 metrics/min * 60 * 24: Retrieve data for 1 day.
+   */
+  private static final int METRICS_PER_MINUTE = 4;
+  public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) *
+    METRICS_PER_MINUTE;
+  private static ObjectMapper mapper = new ObjectMapper();
+
+  private static TypeReference<Map<Long, Double>> metricValuesTypeRef =
+    new TypeReference<Map<Long, Double>>() {};
+  private final ConnectionProvider dataSource;
+
+  public PhoenixHBaseAccessor(Configuration hbaseConf,
+                              Configuration metricsConf){
+    this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
+  }
+
+  public PhoenixHBaseAccessor(Configuration hbaseConf,
+                              Configuration metricsConf,
+                              ConnectionProvider dataSource) {
+    this.hbaseConf = hbaseConf;
+    this.metricsConf = metricsConf;
+    RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, 5760);
+    try {
+      Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+    } catch (ClassNotFoundException e) {
+      LOG.error("Phoenix client jar not found in the classpath.", e);
+      throw new IllegalStateException(e);
+    }
+    this.dataSource = dataSource;
+    this.retryCounterFactory = new RetryCounterFactory(
+      metricsConf.getInt(GLOBAL_MAX_RETRIES, 10),
+      (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5)));
+  }
+
+
+  private Connection getConnectionRetryingOnException()
+    throws SQLException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try{
+        return getConnection();
+      } catch (SQLException e) {
+        if(!retryCounter.shouldRetry()){
+          LOG.error("HBaseAccessor getConnection failed after "
+            + retryCounter.getMaxAttempts() + " attempts");
+          throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+    }
+  }
+
+
+  /**
+   * Get JDBC connection to HBase store. Assumption is that the hbase
+   * configuration is present on the classpath and loaded by the caller into
+   * the Configuration object.
+   * Phoenix already caches the HConnection between the client and HBase
+   * cluster.
+   *
+   * @return @java.sql.Connection
+   */
+  public Connection getConnection() throws SQLException {
+    return dataSource.getConnection();
+  }
+
+  public static Map readMetricFromJSON(String json) throws IOException {
+    return mapper.readValue(json, metricValuesTypeRef);
+  }
+
+  @SuppressWarnings("unchecked")
+  static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
+    throws SQLException, IOException {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(rs.getString("METRIC_NAME"));
+    metric.setAppId(rs.getString("APP_ID"));
+    metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    metric.setHostName(rs.getString("HOSTNAME"));
+    metric.setTimestamp(rs.getLong("SERVER_TIME"));
+    metric.setStartTime(rs.getLong("START_TIME"));
+    metric.setType(rs.getString("UNITS"));
+    metric.setMetricValues(
+      (Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS")));
+    return metric;
+  }
+
+  static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
+    throws SQLException, IOException {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(rs.getString("METRIC_NAME"));
+    metric.setAppId(rs.getString("APP_ID"));
+    metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    metric.setHostName(rs.getString("HOSTNAME"));
+    metric.setTimestamp(rs.getLong("SERVER_TIME"));
+    metric.setType(rs.getString("UNITS"));
+    return metric;
+  }
+
+  static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
+    throws SQLException {
+    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+
+    metricHostAggregate.setDeviation(0.0);
+    return metricHostAggregate;
+  }
+
+  static TimelineClusterMetric
+  getTimelineMetricClusterKeyFromResultSet(ResultSet rs)
+    throws SQLException, IOException {
+    TimelineClusterMetric metric = new TimelineClusterMetric(
+      rs.getString("METRIC_NAME"),
+      rs.getString("APP_ID"),
+      rs.getString("INSTANCE_ID"),
+      rs.getLong("SERVER_TIME"),
+      rs.getString("UNITS"));
+
+    return metric;
+  }
+
+  static MetricClusterAggregate
+  getMetricClusterAggregateFromResultSet(ResultSet rs)
+    throws SQLException {
+    MetricClusterAggregate agg = new MetricClusterAggregate();
+    agg.setSum(rs.getDouble("METRIC_SUM"));
+    agg.setMax(rs.getDouble("METRIC_MAX"));
+    agg.setMin(rs.getDouble("METRIC_MIN"));
+    agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
+
+    agg.setDeviation(0.0);
+
+    return agg;
+  }
+
+  protected void initMetricSchema() {
+    Connection conn = null;
+    Statement stmt = null;
+
+    String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
+    String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, 
DEFAULT_TABLE_COMPRESSION);
+    String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");
+    String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");
+    String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");
+    String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, 
"2592000");
+    String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, 
"31536000");
+
+    try {
+      LOG.info("Initializing metrics schema...");
+      conn = getConnectionRetryingOnException();
+      stmt = conn.createStatement();
+
+      stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL,
+        encoding, precisionTtl, compression));
+      
stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL,
+        encoding, hostHourTtl, compression));
+      
stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL,
+        encoding, hostMinTtl, compression));
+      
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
+        encoding, clusterMinTtl, compression));
+      
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+        encoding, clusterHourTtl, compression));
+      conn.commit();
+    } catch (SQLException sql) {
+      LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql);
+      throw new MetricsInitializationException(
+        "Error creating Metrics Schema in HBase using Phoenix.", sql);
+    } catch (InterruptedException e) {
+      LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", e);
+      throw new MetricsInitializationException(
+        "Error creating Metrics Schema in HBase using Phoenix.", e);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  public void insertMetricRecords(TimelineMetrics metrics)
+    throws SQLException, IOException {
+
+    List<TimelineMetric> timelineMetrics = metrics.getMetrics();
+    if (timelineMetrics == null || timelineMetrics.isEmpty()) {
+      LOG.debug("Empty metrics insert request.");
+      return;
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement metricRecordStmt = null;
+    long currentTime = System.currentTimeMillis();
+
+    try {
+      metricRecordStmt = conn.prepareStatement(String.format(
+        UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
+
+      for (TimelineMetric metric : timelineMetrics) {
+        metricRecordStmt.clearParameters();
+
+        LOG.trace("host: " + metric.getHostName() + ", " +
+          "metricName = " + metric.getMetricName() + ", " +
+          "values: " + metric.getMetricValues());
+        Aggregator agg = new Aggregator();
+        double[] aggregates =  agg.calculateAggregates(
+          metric.getMetricValues());
+
+        metricRecordStmt.setString(1, metric.getMetricName());
+        metricRecordStmt.setString(2, metric.getHostName());
+        metricRecordStmt.setString(3, metric.getAppId());
+        metricRecordStmt.setString(4, metric.getInstanceId());
+        metricRecordStmt.setLong(5, currentTime);
+        metricRecordStmt.setLong(6, metric.getStartTime());
+        metricRecordStmt.setString(7, metric.getType());
+        metricRecordStmt.setDouble(8, aggregates[0]);
+        metricRecordStmt.setDouble(9, aggregates[1]);
+        metricRecordStmt.setDouble(10, aggregates[2]);
+        metricRecordStmt.setLong(11, (long)aggregates[3]);
+        String json =
+          TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+        metricRecordStmt.setString(12, json);
+
+        try {
+          metricRecordStmt.executeUpdate();
+        } catch (SQLException sql) {
+          LOG.error(sql);
+        }
+      }
+
+      conn.commit();
+
+    } finally {
+      if (metricRecordStmt != null) {
+        try {
+          metricRecordStmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+
+  @SuppressWarnings("unchecked")
+  public TimelineMetrics getMetricRecords(final Condition condition)
+    throws SQLException, IOException {
+
+    if (condition.isEmpty()) {
+      throw new SQLException("No filter criteria specified.");
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    TimelineMetrics metrics = new TimelineMetrics();
+
+    try {
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+      ResultSet rs = stmt.executeQuery();
+
+      while (rs.next()) {
+        TimelineMetric metric = getTimelineMetricFromResultSet(rs);
+
+        if (condition.isGrouped()) {
+          metrics.addOrMergeTimelineMetric(metric);
+        } else {
+          metrics.getMetrics().add(metric);
+        }
+      }
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    return metrics;
+  }
+
+  public void saveHostAggregateRecords(Map<TimelineMetric,
+    MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+    throws SQLException {
+
+    if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
+      Connection conn = getConnection();
+      PreparedStatement stmt = null;
+
+      long start = System.currentTimeMillis();
+      int rowCount = 0;
+
+      try {
+        stmt = conn.prepareStatement(
+          String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
+
+        for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
+          hostAggregateMap.entrySet()) {
+
+          TimelineMetric metric = metricAggregate.getKey();
+          MetricHostAggregate hostAggregate = metricAggregate.getValue();
+
+          rowCount++;
+          stmt.clearParameters();
+          stmt.setString(1, metric.getMetricName());
+          stmt.setString(2, metric.getHostName());
+          stmt.setString(3, metric.getAppId());
+          stmt.setString(4, metric.getInstanceId());
+          stmt.setLong(5, metric.getTimestamp());
+          stmt.setString(6, metric.getType());
+          stmt.setDouble(7, hostAggregate.getSum());
+          stmt.setDouble(8, hostAggregate.getMax());
+          stmt.setDouble(9, hostAggregate.getMin());
+          stmt.setDouble(10, hostAggregate.getNumberOfSamples());
+
+          try {
+            // TODO: Why this exception is swallowed
+            stmt.executeUpdate();
+          } catch (SQLException sql) {
+            LOG.error(sql);
+          }
+
+          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+            conn.commit();
+            rowCount = 0;
+          }
+
+        }
+
+        conn.commit();
+
+      } finally {
+        if (stmt != null) {
+          try {
+            stmt.close();
+          } catch (SQLException e) {
+            // Ignore
+          }
+        }
+        if (conn != null) {
+          try {
+            conn.close();
+          } catch (SQLException sql) {
+            // Ignore
+          }
+        }
+      }
+
+      long end = System.currentTimeMillis();
+
+      if ((end - start) > 60000l) {
+        LOG.info("Time to save map: " + (end - start) + ", " +
+          "thread = " + Thread.currentThread().getClass());
+      }
+    }
+  }
+
+  /**
+   * Save Metric aggregate records.
+   *
+   * @throws SQLException
+   */
+  public void saveClusterAggregateRecords(
+    Map<TimelineClusterMetric, MetricClusterAggregate> records)
+    throws SQLException {
+
+      if (records == null || records.isEmpty()) {
+        LOG.debug("Empty aggregate records.");
+        return;
+      }
+
+      long start = System.currentTimeMillis();
+
+      Connection conn = getConnection();
+      PreparedStatement stmt = null;
+      try {
+        stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+        int rowCount = 0;
+
+        for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+          aggregateEntry : records.entrySet()) {
+          TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+          MetricClusterAggregate aggregate = aggregateEntry.getValue();
+
+          LOG.trace("clusterMetric = " + clusterMetric + ", " +
+            "aggregate = " + aggregate);
+
+          rowCount++;
+          stmt.clearParameters();
+          stmt.setString(1, clusterMetric.getMetricName());
+          stmt.setString(2, clusterMetric.getAppId());
+          stmt.setString(3, clusterMetric.getInstanceId());
+          stmt.setLong(4, clusterMetric.getTimestamp());
+          stmt.setString(5, clusterMetric.getType());
+          stmt.setDouble(6, aggregate.getSum());
+          stmt.setInt(7, aggregate.getNumberOfHosts());
+          stmt.setDouble(8, aggregate.getMax());
+          stmt.setDouble(9, aggregate.getMin());
+
+          try {
+            stmt.executeUpdate();
+          } catch (SQLException sql) {
+            // TODO: Why this exception is swallowed
+            LOG.error(sql);
+          }
+
+          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+            conn.commit();
+            rowCount = 0;
+          }
+        }
+
+        conn.commit();
+
+      } finally {
+        if (stmt != null) {
+          try {
+            stmt.close();
+          } catch (SQLException e) {
+            // Ignore
+          }
+        }
+        if (conn != null) {
+          try {
+            conn.close();
+          } catch (SQLException sql) {
+            // Ignore
+          }
+        }
+      }
+      long end = System.currentTimeMillis();
+      if ((end - start) > 60000l) {
+        LOG.info("Time to save: " + (end - start) + ", " +
+          "thread = " + Thread.currentThread().getName());
+      }
+    }
+
+  /**
+   * Save Metric aggregate records.
+   *
+   * @throws SQLException
+   */
+  public void saveClusterAggregateHourlyRecords(
+    Map<TimelineClusterMetric, MetricHostAggregate> records,
+    String tableName)
+    throws SQLException {
+    if (records == null || records.isEmpty()) {
+      LOG.debug("Empty aggregate records.");
+      return;
+    }
+
+    long start = System.currentTimeMillis();
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(String.format
+        (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
+      int rowCount = 0;
+
+      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
+        aggregateEntry : records.entrySet()) {
+        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+        MetricHostAggregate aggregate = aggregateEntry.getValue();
+
+        LOG.trace("clusterMetric = " + clusterMetric + ", " +
+          "aggregate = " + aggregate);
+
+        rowCount++;
+        stmt.clearParameters();
+        stmt.setString(1, clusterMetric.getMetricName());
+        stmt.setString(2, clusterMetric.getAppId());
+        stmt.setString(3, clusterMetric.getInstanceId());
+        stmt.setLong(4, clusterMetric.getTimestamp());
+        stmt.setString(5, clusterMetric.getType());
+        stmt.setDouble(6, aggregate.getSum());
+//        stmt.setInt(7, aggregate.getNumberOfHosts());
+        stmt.setLong(7, aggregate.getNumberOfSamples());
+        stmt.setDouble(8, aggregate.getMax());
+        stmt.setDouble(9, aggregate.getMin());
+
+        try {
+          stmt.executeUpdate();
+        } catch (SQLException sql) {
+          // we have no way to verify it works!!!
+          LOG.error(sql);
+        }
+
+        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+          conn.commit();
+          rowCount = 0;
+        }
+      }
+
+      conn.commit();
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    long end = System.currentTimeMillis();
+    if ((end - start) > 60000l) {
+      LOG.info("Time to save: " + (end - start) + ", " +
+        "thread = " + Thread.currentThread().getName());
+    }
+  }
+
+
+  public TimelineMetrics getAggregateMetricRecords(final Condition condition)
+    throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new SQLException("No filter criteria specified.");
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    TimelineMetrics metrics = new TimelineMetrics();
+
+    try {
+      stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
+
+      ResultSet rs = stmt.executeQuery();
+
+      while (rs.next()) {
+        TimelineMetric metric = new TimelineMetric();
+        metric.setMetricName(rs.getString("METRIC_NAME"));
+        metric.setAppId(rs.getString("APP_ID"));
+        metric.setInstanceId(rs.getString("INSTANCE_ID"));
+        metric.setTimestamp(rs.getLong("SERVER_TIME"));
+        metric.setStartTime(rs.getLong("SERVER_TIME"));
+        Map<Long, Double> valueMap = new HashMap<Long, Double>();
+        valueMap.put(rs.getLong("SERVER_TIME"),
+          rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
+        metric.setMetricValues(valueMap);
+
+        if (condition.isGrouped()) {
+          metrics.addOrMergeTimelineMetric(metric);
+        } else {
+          metrics.getMetrics().add(metric);
+        }
+      }
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    LOG.info("Aggregate records size: " + metrics.getMetrics().size());
+    return metrics;
+  }
+}

Reply via email to