Repository: ambari
Updated Branches:
  refs/heads/trunk 41a41e64b -> c0da0a1d9


AMBARI-11010. Aggregator function does not work for point in time service 
component metric query. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c0da0a1d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c0da0a1d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c0da0a1d

Branch: refs/heads/trunk
Commit: c0da0a1d9b295b2ac7f0ac5638b255852e816feb
Parents: 41a41e6
Author: Siddharth Wagle <swa...@hortonworks.com>
Authored: Fri May 8 10:08:16 2015 -0700
Committer: Siddharth Wagle <swa...@hortonworks.com>
Committed: Fri May 8 10:42:26 2015 -0700

----------------------------------------------------------------------
 .../timeline/SingleValuedTimelineMetric.java    | 107 +++++++++++
 .../metrics2/sink/timeline/TimelineMetrics.java |  26 +++
 .../timeline/HBaseTimelineMetricStore.java      |   3 +-
 .../metrics/timeline/PhoenixHBaseAccessor.java  | 191 +++++++------------
 .../TimelineMetricHostAggregator.java           |   9 +-
 .../aggregators/TimelineMetricReadHelper.java   |  29 ++-
 .../metrics/timeline/ITClusterAggregator.java   |   1 -
 .../metrics/timeline/ITMetricAggregator.java    |  20 +-
 .../timeline/ITPhoenixHBaseAccessor.java        |  52 ++++-
 9 files changed, 292 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
new file mode 100644
index 0000000..8ecca54
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+/**
+ * This class prevents creating a TreeMap for every instantiation of a metric
+ * read from the store. The methods are meant to provide interoperability
+ * with @TimelineMetric
+ */
+public class SingleValuedTimelineMetric {
+  private Long timestamp;
+  private Double value;
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private String hostName;
+  private Long startTime;
+  private String type;
+
+  public void setSingleTimeseriesValue(Long timestamp, Double value) {
+    this.timestamp = timestamp;
+    this.value = value;
+  }
+
+  public SingleValuedTimelineMetric(String metricName, String appId,
+                                    String instanceId, String hostName,
+                                    long timestamp, long startTime, String 
type) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.hostName = hostName;
+    this.timestamp = timestamp;
+    this.startTime = startTime;
+    this.type = type;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public Double getValue() {
+    return value;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public boolean equalsExceptTime(TimelineMetric metric) {
+    if (!metricName.equals(metric.getMetricName())) return false;
+    if (hostName != null ? !hostName.equals(metric.getHostName()) : 
metric.getHostName() != null)
+      return false;
+    if (appId != null ? !appId.equals(metric.getAppId()) : metric.getAppId() 
!= null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(metric.getInstanceId()) : 
metric.getInstanceId() != null) return false;
+
+    return true;
+  }
+
+  public TimelineMetric getTimelineMetric() {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(this.metricName);
+    metric.setAppId(this.appId);
+    metric.setHostName(this.hostName);
+    metric.setType(this.type);
+    metric.setInstanceId(this.instanceId);
+    metric.setStartTime(this.startTime);
+    metric.setTimestamp(this.timestamp);
+    metric.getMetricValues().put(timestamp, value);
+    return metric;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
index 3eb0e89..383079a 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -99,4 +99,30 @@ public class TimelineMetrics {
       allMetrics.add(metric);
     }
   }
+
+  // Optimization that addresses too many TreeMaps from getting created.
+  public void addOrMergeTimelineMetric(SingleValuedTimelineMetric metric) {
+    TimelineMetric metricToMerge = null;
+
+    if (!allMetrics.isEmpty()) {
+      for (TimelineMetric timelineMetric : allMetrics) {
+        if (metric.equalsExceptTime(timelineMetric)) {
+          metricToMerge = timelineMetric;
+          break;
+        }
+      }
+    }
+
+    if (metricToMerge != null) {
+      metricToMerge.getMetricValues().put(metric.getTimestamp(), 
metric.getValue());
+      if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+        metricToMerge.setTimestamp(metric.getTimestamp());
+      }
+      if (metricToMerge.getStartTime() > metric.getStartTime()) {
+        metricToMerge.setStartTime(metric.getStartTime());
+      }
+    } else {
+      allMetrics.add(metric.getTimelineMetric());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 447f6f9..f5d6bc0 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -191,8 +191,7 @@ public class HBaseTimelineMetricStore extends 
AbstractService
     return metricValues;
   }
 
-  public static HashMap<String, List<Function>>
-  parseMetricNamesToAggregationFunctions(List<String> metricNames) {
+  public static HashMap<String, List<Function>> 
parseMetricNamesToAggregationFunctions(List<String> metricNames) {
     HashMap<String, List<Function>> metricsFunctions = new HashMap<String,
       List<Function>>();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 7258cad..b890171 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
@@ -39,7 +40,6 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
-
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -50,9 +50,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
-
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
@@ -132,23 +130,23 @@ public class PhoenixHBaseAccessor {
 
   private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet 
rs)
     throws SQLException, IOException {
-    TimelineMetric metric = TIMELINE_METRIC_READ_HELPER
-      .getTimelineMetricCommonsFromResultSet(rs);
+    TimelineMetric metric = 
TIMELINE_METRIC_READ_HELPER.getTimelineMetricCommonsFromResultSet(rs);
     
metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
-
     return metric;
   }
 
-  public static TimelineMetric getAggregatedTimelineMetricFromResultSet(
-    ResultSet rs, Function f) throws SQLException, IOException {
+  public static SingleValuedTimelineMetric 
getAggregatedTimelineMetricFromResultSet(
+      ResultSet rs, Function f) throws SQLException, IOException {
 
-    TimelineMetric metric = new TimelineMetric();
-    metric.setHostName(rs.getString("HOSTNAME"));
-    metric.setAppId(rs.getString("APP_ID"));
-    metric.setInstanceId(rs.getString("INSTANCE_ID"));
-    metric.setTimestamp(rs.getLong("SERVER_TIME"));
-    metric.setStartTime(rs.getLong("SERVER_TIME"));
-    metric.setType(rs.getString("UNITS"));
+    SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
+      rs.getString("METRIC_NAME") + f.getSuffix(),
+      rs.getString("APP_ID"),
+      rs.getString("INSTANCE_ID"),
+      rs.getString("HOSTNAME"),
+      rs.getLong("SERVER_TIME"),
+      rs.getLong("SERVER_TIME"),
+      rs.getString("UNITS")
+    );
 
     // get functions for metricnames
 
@@ -171,11 +169,8 @@ public class PhoenixHBaseAccessor {
         break;
     }
 
-    metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix());
+    metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
 
-    Map<Long, Double> valueMap = new TreeMap<Long, Double>();
-    valueMap.put(rs.getLong("SERVER_TIME"), value);
-    metric.setMetricValues(valueMap);
     return metric;
   }
 
@@ -194,30 +189,6 @@ public class PhoenixHBaseAccessor {
     return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef);
   }
 
-  public static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
-    throws SQLException, IOException {
-    TimelineMetric metric = new TimelineMetric();
-    metric.setMetricName(rs.getString("METRIC_NAME"));
-    metric.setAppId(rs.getString("APP_ID"));
-    metric.setInstanceId(rs.getString("INSTANCE_ID"));
-    metric.setHostName(rs.getString("HOSTNAME"));
-    metric.setTimestamp(rs.getLong("SERVER_TIME"));
-    metric.setType(rs.getString("UNITS"));
-    return metric;
-  }
-
-  public static MetricHostAggregate 
getMetricHostAggregateFromResultSet(ResultSet rs)
-    throws SQLException {
-    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
-    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
-    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
-    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
-    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
-
-    metricHostAggregate.setDeviation(0.0);
-    return metricHostAggregate;
-  }
-
   private Connection getConnectionRetryingOnException()
     throws SQLException, InterruptedException {
     RetryCounter retryCounter = retryCounterFactory.create();
@@ -465,9 +436,9 @@ public class PhoenixHBaseAccessor {
   }
 
   private void appendMetricFromResultSet(
-    TimelineMetrics metrics, Condition condition, Map<String,
-    List<Function>> metricFunctions, ResultSet rs)
-    throws SQLException, IOException {
+      TimelineMetrics metrics, Condition condition, Map<String,
+      List<Function>> metricFunctions, ResultSet rs)
+      throws SQLException, IOException {
     if (condition.getPrecision() == Precision.HOURS
       || condition.getPrecision() == Precision.MINUTES) {
 
@@ -475,14 +446,12 @@ public class PhoenixHBaseAccessor {
       List<Function> functions = metricFunctions.get(metricName);
 
       for (Function f : functions) {
-        TimelineMetric metric;
-
-        metric = getAggregatedTimelineMetricFromResultSet(rs, f);
+        SingleValuedTimelineMetric metric = 
getAggregatedTimelineMetricFromResultSet(rs, f);
 
         if (condition.isGrouped()) {
           metrics.addOrMergeTimelineMetric(metric);
         } else {
-          metrics.getMetrics().add(metric);
+          metrics.getMetrics().add(metric.getTimelineMetric());
         }
       }
     }
@@ -540,10 +509,8 @@ public class PhoenixHBaseAccessor {
    * @return @TimelineMetrics
    * @throws SQLException
    */
-  public TimelineMetrics getAggregateMetricRecords(
-    final Condition condition,
-    Map<String, List<Function>> metricFunctions)
-    throws SQLException {
+  public TimelineMetrics getAggregateMetricRecords(final Condition condition,
+      Map<String, List<Function>> metricFunctions) throws SQLException {
 
     validateConditionIsNotEmpty(condition);
 
@@ -555,14 +522,13 @@ public class PhoenixHBaseAccessor {
     try {
       //get latest
       if(condition.isPointInTime()) {
-        stmt = getLatestAggregateMetricRecords(condition, conn, metrics);
+        stmt = getLatestAggregateMetricRecords(condition, conn, metrics, 
metricFunctions);
       } else {
         stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
 
         rs = stmt.executeQuery();
         while (rs.next()) {
-          appendAggregateMetricFromResultSet(metrics, condition,
-            metricFunctions, rs);
+          appendAggregateMetricFromResultSet(metrics, condition, 
metricFunctions, rs);
         }
       }
     } finally {
@@ -593,34 +559,34 @@ public class PhoenixHBaseAccessor {
     return metrics;
   }
 
-  private void appendAggregateMetricFromResultSet(
-    TimelineMetrics metrics, Condition condition,
-    Map<String, List<Function>> metricFunctions, ResultSet rs)
-    throws SQLException {
+  private void appendAggregateMetricFromResultSet(TimelineMetrics metrics,
+      Condition condition, Map<String, List<Function>> metricFunctions,
+      ResultSet rs) throws SQLException {
 
     String metricName = rs.getString("METRIC_NAME");
     List<Function> functions = metricFunctions.get(metricName);
 
     for (Function aggregateFunction : functions) {
-      TimelineMetric metric;
+      SingleValuedTimelineMetric metric;
 
-      if (condition.getPrecision() == Precision.HOURS) {
-        metric = getAggregateHoursTimelineMetricFromResultSet(rs, 
aggregateFunction);
+      if (condition.getPrecision() == Precision.HOURS
+          || condition.getPrecision() == Precision.DAYS) {
+        metric = getAggregateTimelineMetricFromResultSet(rs, 
aggregateFunction, false);
       } else {
-        metric = getAggregateTimelineMetricFromResultSet(rs, 
aggregateFunction);
+        metric = getAggregateTimelineMetricFromResultSet(rs, 
aggregateFunction, true);
       }
 
       if (condition.isGrouped()) {
         metrics.addOrMergeTimelineMetric(metric);
       } else {
-        metrics.getMetrics().add(metric);
+        metrics.getMetrics().add(metric.getTimelineMetric());
       }
     }
   }
 
-  private PreparedStatement getLatestAggregateMetricRecords(
-    Condition condition, Connection conn, TimelineMetrics metrics)
-    throws SQLException {
+  private PreparedStatement getLatestAggregateMetricRecords(Condition 
condition,
+      Connection conn, TimelineMetrics metrics,
+      Map<String, List<Function>> metricFunctions) throws SQLException {
 
     PreparedStatement stmt = null;
     SplitByMetricNamesCondition splitCondition =
@@ -629,15 +595,28 @@ public class PhoenixHBaseAccessor {
     for (String metricName: splitCondition.getOriginalMetricNames()) {
 
       splitCondition.setCurrentMetric(metricName);
-      stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn,
-        splitCondition);
+      stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn, 
splitCondition);
       ResultSet rs = null;
       try {
         rs = stmt.executeQuery();
         while (rs.next()) {
-          TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs,
-            new Function());
-          metrics.getMetrics().add(metric);
+          List<Function> functions = metricFunctions.get(metricName);
+          if (functions != null) {
+            for (Function f : functions) {
+              SingleValuedTimelineMetric metric =
+                getAggregateTimelineMetricFromResultSet(rs, f, true);
+
+              if (condition.isGrouped()) {
+                metrics.addOrMergeTimelineMetric(metric);
+              } else {
+                metrics.getMetrics().add(metric.getTimelineMetric());
+              }
+            }
+          } else {
+            SingleValuedTimelineMetric metric =
+              getAggregateTimelineMetricFromResultSet(rs, new Function(), 
true);
+            metrics.getMetrics().add(metric.getTimelineMetric());
+          }
         }
       } finally {
         if (rs != null) {
@@ -653,54 +632,28 @@ public class PhoenixHBaseAccessor {
     return stmt;
   }
 
-  private TimelineMetric getAggregateTimelineMetricFromResultSet(
-    ResultSet rs, Function f) throws SQLException {
-    TimelineMetric metric = new TimelineMetric();
-    metric.setAppId(rs.getString("APP_ID"));
-    metric.setInstanceId(rs.getString("INSTANCE_ID"));
-    metric.setTimestamp(rs.getLong("SERVER_TIME"));
-    metric.setStartTime(rs.getLong("SERVER_TIME"));
+  private SingleValuedTimelineMetric 
getAggregateTimelineMetricFromResultSet(ResultSet rs,
+      Function f, boolean useHostCount) throws SQLException {
 
-    double value;
-    switch(f.getReadFunction()){
-      case AVG:
-        value = rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT");
-        break;
-      case MIN:
-        value = rs.getDouble("METRIC_MIN");
-        break;
-      case MAX:
-        value = rs.getDouble("METRIC_MAX");
-        break;
-      case SUM:
-        value = rs.getDouble("METRIC_SUM");
-        break;
-      default:
-        value = rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT");
-        break;
+    String countColumnName = "METRIC_COUNT";
+    if (useHostCount) {
+      countColumnName = "HOSTS_COUNT";
     }
 
-    metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix());
-
-    Map<Long, Double> valueMap = new TreeMap<Long, Double>();
-    valueMap.put(rs.getLong("SERVER_TIME"), value);
-    metric.setMetricValues(valueMap);
-
-    return metric;
-  }
-
-  private TimelineMetric getAggregateHoursTimelineMetricFromResultSet(
-    ResultSet rs, Function f) throws SQLException {
-    TimelineMetric metric = new TimelineMetric();
-    metric.setAppId(rs.getString("APP_ID"));
-    metric.setInstanceId(rs.getString("INSTANCE_ID"));
-    metric.setTimestamp(rs.getLong("SERVER_TIME"));
-    metric.setStartTime(rs.getLong("SERVER_TIME"));
+    SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
+      rs.getString("METRIC_NAME") + f.getSuffix(),
+      rs.getString("APP_ID"),
+      rs.getString("INSTANCE_ID"),
+      null,
+      rs.getLong("SERVER_TIME"),
+      rs.getLong("SERVER_TIME"),
+      rs.getString("UNITS")
+    );
 
     double value;
     switch(f.getReadFunction()){
       case AVG:
-        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+        value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName);
         break;
       case MIN:
         value = rs.getDouble("METRIC_MIN");
@@ -712,15 +665,11 @@ public class PhoenixHBaseAccessor {
         value = rs.getDouble("METRIC_SUM");
         break;
       default:
-        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
+        value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName);
         break;
     }
 
-    metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix());
-
-    Map<Long, Double> valueMap = new TreeMap<Long, Double>();
-    valueMap.put(rs.getLong("SERVER_TIME"), value);
-    metric.setMetricValues(valueMap);
+    metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
 
     return metric;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 18e5f18..796cb72 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -34,6 +34,7 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 
 public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
   private static final Log LOG = 
LogFactory.getLog(TimelineMetricHostAggregator.class);
+  TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
   public TimelineMetricHostAggregator(PhoenixHBaseAccessor hBaseAccessor,
                                       Configuration metricsConf,
@@ -79,14 +80,14 @@ public class TimelineMetricHostAggregator extends 
AbstractTimelineAggregator {
       throws IOException, SQLException {
     TimelineMetric existingMetric = null;
     MetricHostAggregate hostAggregate = null;
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-      new HashMap<TimelineMetric, MetricHostAggregate>();
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =  new 
HashMap<TimelineMetric, MetricHostAggregate>();
+
 
     while (rs.next()) {
       TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
       MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+        readHelper.getMetricHostAggregateFromResultSet(rs);
 
       if (existingMetric == null) {
         // First row

http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index 25f8c62..398f4c3 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -38,7 +38,7 @@ public class TimelineMetricReadHelper {
   }
 
   public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
-    throws SQLException, IOException {
+      throws SQLException, IOException {
     TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
     Map<Long, Double> sortedByTimeMetrics = new TreeMap<Long, Double>(
         PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS")));
@@ -50,7 +50,7 @@ public class TimelineMetricReadHelper {
    * Returns common part of timeline metrics record without the values.
    */
   public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs)
-    throws SQLException {
+      throws SQLException {
     TimelineMetric metric = new TimelineMetric();
     metric.setMetricName(rs.getString("METRIC_NAME"));
     metric.setAppId(rs.getString("APP_ID"));
@@ -76,7 +76,7 @@ public class TimelineMetricReadHelper {
   }
 
   public MetricClusterAggregate 
getMetricClusterTimeAggregateFromResultSet(ResultSet rs)
-    throws SQLException {
+      throws SQLException {
     MetricClusterAggregate agg = new MetricClusterAggregate();
     agg.setSum(rs.getDouble("METRIC_SUM"));
     agg.setMax(rs.getDouble("METRIC_MAX"));
@@ -98,5 +98,28 @@ public class TimelineMetricReadHelper {
       rs.getString("UNITS"));
   }
 
+  public MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
+      throws SQLException {
+    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+
+    metricHostAggregate.setDeviation(0.0);
+    return metricHostAggregate;
+  }
+
+  public TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
+      throws SQLException, IOException {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(rs.getString("METRIC_NAME"));
+    metric.setAppId(rs.getString("APP_ID"));
+    metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    metric.setHostName(rs.getString("HOSTNAME"));
+    metric.setTimestamp(rs.getLong("SERVER_TIME"));
+    metric.setType(rs.getString("UNITS"));
+    return metric;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
index aa276f3..fb3bc30 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -135,7 +135,6 @@ public class ITClusterAggregator extends 
AbstractMiniHBaseClusterTest {
     }
   }
 
-
   @Test
   public void testShouldAggregateClusterIgnoringInstance() throws Exception {
     // GIVEN

http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
index f4f2223..5f646fe 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
@@ -23,6 +23,7 @@ import 
org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
@@ -107,6 +108,7 @@ public class ITMetricAggregator extends 
AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregatorMinute =
       
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new 
Configuration());
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -137,9 +139,9 @@ public class ITMetricAggregator extends 
AbstractMiniHBaseClusterTest {
     int count = 0;
     while (rs.next()) {
       TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
       MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+        readHelper.getMetricHostAggregateFromResultSet(rs);
 
       if ("disk_free".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
@@ -167,6 +169,7 @@ public class ITMetricAggregator extends 
AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregator =
       
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new 
Configuration());
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
     long startTime = System.currentTimeMillis();
 
     MetricHostAggregate expectedAggregate =
@@ -210,9 +213,9 @@ public class ITMetricAggregator extends 
AbstractMiniHBaseClusterTest {
 
     while (rs.next()) {
       TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
       MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+        readHelper.getMetricHostAggregateFromResultSet(rs);
 
       if ("disk_used".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
@@ -229,6 +232,7 @@ public class ITMetricAggregator extends 
AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator aggregator =
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb, 
new Configuration());
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
     long startTime = System.currentTimeMillis();
 
     MetricHostAggregate expectedAggregate =
@@ -270,9 +274,9 @@ public class ITMetricAggregator extends 
AbstractMiniHBaseClusterTest {
 
     while (rs.next()) {
       TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
       MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+        readHelper.getMetricHostAggregateFromResultSet(rs);
 
       if ("disk_used".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
@@ -301,9 +305,7 @@ public class ITMetricAggregator extends 
AbstractMiniHBaseClusterTest {
     return metrics;
   }
 
-  private TimelineMetric createMetric(long startTime,
-                                      String metricName,
-                                      String host) {
+  private TimelineMetric createMetric(long startTime, String metricName, 
String host) {
     TimelineMetric m = new TimelineMetric();
     m.setAppId("host");
     m.setHostName(host);

http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 4c56f77..00db767 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -86,20 +86,20 @@ public class ITPhoenixHBaseAccessor extends 
AbstractMiniHBaseClusterTest {
     long ctime = startTime;
     long minute = 60 * 1000;
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-        "disk_free", 1));
+      "disk_free", 1));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
-        "disk_free", 2));
+      "disk_free", 2));
     ctime += minute;
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-        "disk_free", 2));
+      "disk_free", 2));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
-        "disk_free", 1));
+      "disk_free", 1));
 
     // WHEN
     long endTime = ctime + minute;
     Condition condition = new DefaultCondition(
-        Collections.singletonList("disk_free"), "local1", null, null, 
startTime,
-        endTime, Precision.SECONDS, null, true);
+      Collections.singletonList("disk_free"), "local1", null, null, startTime,
+      endTime, Precision.SECONDS, null, true);
     TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
       singletonValueFunctionMap("disk_free"));
 
@@ -240,6 +240,46 @@ public class ITPhoenixHBaseAccessor extends 
AbstractMiniHBaseClusterTest {
   }
 
   @Test
+  public void testGetClusterMetricRecordLatestWithFunction() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new 
Configuration());
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime + 1;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 1));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 2));
+    ctime += minute;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 2));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 1));
+
+    long endTime = ctime + minute + 1;
+    boolean success = agg.doWork(startTime, endTime);
+    assertTrue(success);
+
+    // WHEN
+    Condition condition = new DefaultCondition(
+      Collections.singletonList("disk_free"), null, null, null,
+      null, null, Precision.SECONDS, null, true);
+    TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,
+      Collections.singletonMap("disk_free",
+        Collections.singletonList(new Function(Function.ReadFunction.SUM, 
null))));
+
+    //THEN
+    assertEquals(1, timelineMetrics.getMetrics().size());
+    TimelineMetric metric = timelineMetrics.getMetrics().get(0);
+
+    assertEquals("disk_free._sum", metric.getMetricName());
+    assertEquals(1, metric.getMetricValues().size());
+    assertEquals(3, 
metric.getMetricValues().values().iterator().next().intValue());
+  }
+
+  @Test
   public void testGetClusterMetricRecordsHours() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =

Reply via email to